将带有 int 标志的列转换为 pyspark 中的字符串数组

Transform column with int flags to array of strings in pyspark

我有一个数据框,其中有一列名为 "traits",它是由多个标志组成的整数。

我需要将此列转换为字符串列表(用于弹性搜索索引)。转换看起来像这样。

TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
    trait_list = []
    if flag & (1 << TRAIT_0):
        trait_list.append("TRAIT_0")
    elif flag & (1 << TRAIT_1):
        trait_list.append("TRAIT_1")
    elif flag & (1 << TRAIT_2):
        trait_list.append("TRAIT_2")

    return trait_list

在 pyspark 中进行此转换的最有效方法是什么?我看到很多关于如何进行字符串连接和拆分的示例,但没有像这样的操作。

使用 pyspark 版本 2.4.5

输入 json 如下所示: { "name": "John Doe", "traits": 5 } 输出 json 应如下所示: { "name": "John Doe", "traits": ["TRAIT_0", "TRAIT_2"] }

我们可以定义一个UDF来包装你的函数然后调用它。这是一些示例代码:

from typing import List
from pyspark.sql.types import ArrayType, StringType

TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2


def flag_to_list(flag: int) -> List[str]:
    trait_list = []
    if flag & (1 << TRAIT_0):
        trait_list.append("TRAIT_0")
    elif flag & (1 << TRAIT_1):
        trait_list.append("TRAIT_1")
    elif flag & (1 << TRAIT_2):
        trait_list.append("TRAIT_2")
    return trait_list


flag_to_list_udf = udf(lambda x: None if x is None else flag_to_list(x),
                       ArrayType(StringType()))

# Create dummy data to test
data = [
    { "name": "John Doe", "traits": 5 },
    { "name": "Jane Doe", "traits": 2 },
    { "name": "Jane Roe", "traits": 0 },
    { "name": "John Roe", "traits": 6 },
]
df = spark.createDataFrame(data, 'name STRING, traits INT')
df.show()
# +--------+------+
# |    name|traits|
# +--------+------+
# |John Doe|     5|
# |Jane Doe|     2|
# |Jane Roe|     0|
# |John Roe|     6|
# +--------+------+

df = df.withColumn('traits_processed', flag_to_list_udf(df['traits']))
df.show()
# +--------+------+----------------+
# |    name|traits|traits_processed|
# +--------+------+----------------+
# |John Doe|     5|       [TRAIT_0]|
# |Jane Doe|     2|       [TRAIT_1]|
# |Jane Roe|     0|              []|
# |John Roe|     6|       [TRAIT_1]|
# +--------+------+----------------+

如果您不想创建新列,可以将 traits_processed 替换为 traits

IIUC,可以试试SparkSQL内置函数:(1)使用conv + split to convert integer(base-10) -> binary(base-2) -> string -> array of strings(reversed), (2) based on 0 or 1 values and their array indices to filter and transform数组转化为对应的named traits数组:

from pyspark.sql.functions import expr

df = spark.createDataFrame([("name1", 5),("name2", 1),("name3", 0),("name4", 12)], ['name', 'traits'])
#DataFrame[name: string, traits: bigint]

traits = [ "Traits_{}".format(i) for i in range(8) ]
traits_array = "array({})".format(",".join("'{}'".format(e) for e in traits))
# array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')

sql_expr = """

    filter(
      transform(
        /* convert int -> binary -> string -> array of strings, and then reverse the array */
        reverse(split(string(conv(traits,10,2)),'(?!$)')),
        /* take the corresponding items from the traits_array when value > 0, else NULL */
        (x,i) -> {}[IF(x='1',i,NULL)]
      ),
      /* filter out NULL items from the array */
      y -> y is not NULL
    ) AS trait_list

""".format(traits_array)
# filter(
#   transform(
#     reverse(split(string(conv(traits,10,2)),'(?!$)')),
#     (x,i) -> array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')[IF(x='1',i,NULL)]
#   ),
#   y -> y is not NULL
# )

df.withColumn("traits_list", expr(sql_expr)).show(truncate=False)
+-----+------+--------------------+
|name |traits|traits_list         |
+-----+------+--------------------+
|name1|5     |[Traits_0, Traits_2]|
|name2|1     |[Traits_0]          |
|name3|0     |[]                  |
|name4|12    |[Traits_2, Traits_3]|
+-----+------+--------------------+

下面是 运行 reverse(split(string(conv(traits,10,2)),'(?!$)')) 之后的结果,请注意拆分模式 (?!$) 用于避免将 NULL 显示为最后一个数组项。

df.selectExpr("*", "reverse(split(string(conv(traits,10,2)),'(?!$)')) as t1").show()
+-----+------+------------+
| name|traits|          t1|
+-----+------+------------+
|name1|     5|   [1, 0, 1]|
|name2|     1|         [1]|
|name3|     0|         [0]|
|name4|    12|[0, 0, 1, 1]|
+-----+------+------------+