将带有 int 标志的列转换为 pyspark 中的字符串数组
Transform column with int flags to array of strings in pyspark
我有一个数据框,其中有一列名为 "traits",它是由多个标志组成的整数。
我需要将此列转换为字符串列表(用于弹性搜索索引)。转换看起来像这样。
TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
trait_list = []
if flag & (1 << TRAIT_0):
trait_list.append("TRAIT_0")
elif flag & (1 << TRAIT_1):
trait_list.append("TRAIT_1")
elif flag & (1 << TRAIT_2):
trait_list.append("TRAIT_2")
return trait_list
在 pyspark 中进行此转换的最有效方法是什么?我看到很多关于如何进行字符串连接和拆分的示例,但没有像这样的操作。
使用 pyspark 版本 2.4.5
输入 json 如下所示:
{ "name": "John Doe", "traits": 5 }
输出 json 应如下所示:
{ "name": "John Doe", "traits": ["TRAIT_0", "TRAIT_2"] }
我们可以定义一个UDF来包装你的函数然后调用它。这是一些示例代码:
from typing import List
from pyspark.sql.types import ArrayType, StringType
TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
trait_list = []
if flag & (1 << TRAIT_0):
trait_list.append("TRAIT_0")
elif flag & (1 << TRAIT_1):
trait_list.append("TRAIT_1")
elif flag & (1 << TRAIT_2):
trait_list.append("TRAIT_2")
return trait_list
flag_to_list_udf = udf(lambda x: None if x is None else flag_to_list(x),
ArrayType(StringType()))
# Create dummy data to test
data = [
{ "name": "John Doe", "traits": 5 },
{ "name": "Jane Doe", "traits": 2 },
{ "name": "Jane Roe", "traits": 0 },
{ "name": "John Roe", "traits": 6 },
]
df = spark.createDataFrame(data, 'name STRING, traits INT')
df.show()
# +--------+------+
# | name|traits|
# +--------+------+
# |John Doe| 5|
# |Jane Doe| 2|
# |Jane Roe| 0|
# |John Roe| 6|
# +--------+------+
df = df.withColumn('traits_processed', flag_to_list_udf(df['traits']))
df.show()
# +--------+------+----------------+
# | name|traits|traits_processed|
# +--------+------+----------------+
# |John Doe| 5| [TRAIT_0]|
# |Jane Doe| 2| [TRAIT_1]|
# |Jane Roe| 0| []|
# |John Roe| 6| [TRAIT_1]|
# +--------+------+----------------+
如果您不想创建新列,可以将 traits_processed
替换为 traits
。
IIUC,可以试试SparkSQL内置函数:(1)使用conv + split to convert integer(base-10) -> binary(base-2) -> string -> array of strings(reversed), (2) based on 0
or 1
values and their array indices to filter and transform数组转化为对应的named traits数组:
from pyspark.sql.functions import expr
df = spark.createDataFrame([("name1", 5),("name2", 1),("name3", 0),("name4", 12)], ['name', 'traits'])
#DataFrame[name: string, traits: bigint]
traits = [ "Traits_{}".format(i) for i in range(8) ]
traits_array = "array({})".format(",".join("'{}'".format(e) for e in traits))
# array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')
sql_expr = """
filter(
transform(
/* convert int -> binary -> string -> array of strings, and then reverse the array */
reverse(split(string(conv(traits,10,2)),'(?!$)')),
/* take the corresponding items from the traits_array when value > 0, else NULL */
(x,i) -> {}[IF(x='1',i,NULL)]
),
/* filter out NULL items from the array */
y -> y is not NULL
) AS trait_list
""".format(traits_array)
# filter(
# transform(
# reverse(split(string(conv(traits,10,2)),'(?!$)')),
# (x,i) -> array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')[IF(x='1',i,NULL)]
# ),
# y -> y is not NULL
# )
df.withColumn("traits_list", expr(sql_expr)).show(truncate=False)
+-----+------+--------------------+
|name |traits|traits_list |
+-----+------+--------------------+
|name1|5 |[Traits_0, Traits_2]|
|name2|1 |[Traits_0] |
|name3|0 |[] |
|name4|12 |[Traits_2, Traits_3]|
+-----+------+--------------------+
下面是 运行 reverse(split(string(conv(traits,10,2)),'(?!$)'))
之后的结果,请注意拆分模式 (?!$)
用于避免将 NULL 显示为最后一个数组项。
df.selectExpr("*", "reverse(split(string(conv(traits,10,2)),'(?!$)')) as t1").show()
+-----+------+------------+
| name|traits| t1|
+-----+------+------------+
|name1| 5| [1, 0, 1]|
|name2| 1| [1]|
|name3| 0| [0]|
|name4| 12|[0, 0, 1, 1]|
+-----+------+------------+
我有一个数据框,其中有一列名为 "traits",它是由多个标志组成的整数。
我需要将此列转换为字符串列表(用于弹性搜索索引)。转换看起来像这样。
TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
trait_list = []
if flag & (1 << TRAIT_0):
trait_list.append("TRAIT_0")
elif flag & (1 << TRAIT_1):
trait_list.append("TRAIT_1")
elif flag & (1 << TRAIT_2):
trait_list.append("TRAIT_2")
return trait_list
在 pyspark 中进行此转换的最有效方法是什么?我看到很多关于如何进行字符串连接和拆分的示例,但没有像这样的操作。
使用 pyspark 版本 2.4.5
输入 json 如下所示:
{ "name": "John Doe", "traits": 5 }
输出 json 应如下所示:
{ "name": "John Doe", "traits": ["TRAIT_0", "TRAIT_2"] }
我们可以定义一个UDF来包装你的函数然后调用它。这是一些示例代码:
from typing import List
from pyspark.sql.types import ArrayType, StringType
TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
trait_list = []
if flag & (1 << TRAIT_0):
trait_list.append("TRAIT_0")
elif flag & (1 << TRAIT_1):
trait_list.append("TRAIT_1")
elif flag & (1 << TRAIT_2):
trait_list.append("TRAIT_2")
return trait_list
flag_to_list_udf = udf(lambda x: None if x is None else flag_to_list(x),
ArrayType(StringType()))
# Create dummy data to test
data = [
{ "name": "John Doe", "traits": 5 },
{ "name": "Jane Doe", "traits": 2 },
{ "name": "Jane Roe", "traits": 0 },
{ "name": "John Roe", "traits": 6 },
]
df = spark.createDataFrame(data, 'name STRING, traits INT')
df.show()
# +--------+------+
# | name|traits|
# +--------+------+
# |John Doe| 5|
# |Jane Doe| 2|
# |Jane Roe| 0|
# |John Roe| 6|
# +--------+------+
df = df.withColumn('traits_processed', flag_to_list_udf(df['traits']))
df.show()
# +--------+------+----------------+
# | name|traits|traits_processed|
# +--------+------+----------------+
# |John Doe| 5| [TRAIT_0]|
# |Jane Doe| 2| [TRAIT_1]|
# |Jane Roe| 0| []|
# |John Roe| 6| [TRAIT_1]|
# +--------+------+----------------+
如果您不想创建新列,可以将 traits_processed
替换为 traits
。
IIUC,可以试试SparkSQL内置函数:(1)使用conv + split to convert integer(base-10) -> binary(base-2) -> string -> array of strings(reversed), (2) based on 0
or 1
values and their array indices to filter and transform数组转化为对应的named traits数组:
from pyspark.sql.functions import expr
df = spark.createDataFrame([("name1", 5),("name2", 1),("name3", 0),("name4", 12)], ['name', 'traits'])
#DataFrame[name: string, traits: bigint]
traits = [ "Traits_{}".format(i) for i in range(8) ]
traits_array = "array({})".format(",".join("'{}'".format(e) for e in traits))
# array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')
sql_expr = """
filter(
transform(
/* convert int -> binary -> string -> array of strings, and then reverse the array */
reverse(split(string(conv(traits,10,2)),'(?!$)')),
/* take the corresponding items from the traits_array when value > 0, else NULL */
(x,i) -> {}[IF(x='1',i,NULL)]
),
/* filter out NULL items from the array */
y -> y is not NULL
) AS trait_list
""".format(traits_array)
# filter(
# transform(
# reverse(split(string(conv(traits,10,2)),'(?!$)')),
# (x,i) -> array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')[IF(x='1',i,NULL)]
# ),
# y -> y is not NULL
# )
df.withColumn("traits_list", expr(sql_expr)).show(truncate=False)
+-----+------+--------------------+
|name |traits|traits_list |
+-----+------+--------------------+
|name1|5 |[Traits_0, Traits_2]|
|name2|1 |[Traits_0] |
|name3|0 |[] |
|name4|12 |[Traits_2, Traits_3]|
+-----+------+--------------------+
下面是 运行 reverse(split(string(conv(traits,10,2)),'(?!$)'))
之后的结果,请注意拆分模式 (?!$)
用于避免将 NULL 显示为最后一个数组项。
df.selectExpr("*", "reverse(split(string(conv(traits,10,2)),'(?!$)')) as t1").show()
+-----+------+------------+
| name|traits| t1|
+-----+------+------------+
|name1| 5| [1, 0, 1]|
|name2| 1| [1]|
|name3| 0| [0]|
|name4| 12|[0, 0, 1, 1]|
+-----+------+------------+