将 URI 查询字符串转换为 PySpark 中的结构键值数组
Convert a URI query string to an Array of Struct key-value in PySpark
我在 PySpark 中有一个 DataFrame,其中有一列 URI 查询字符串 (StringType),如下所示:
+--------------+
| cs_uri_query |
+--------------+
| a=1&b=2&c=3 |
+--------------+
| d&e=&f=4 |
+--------------+
我需要将此列转换为具有以下结构的 StructField 元素的 ArrayType:
ArrayType(StructType([StructField('key', StringType(), nullable=False),
StructField('value', StringType(), nullable=True)]))
我期望的专栏是这样的:
+------------------------------------------------------------+
| cs_uri_query |
+------------------------------------------------------------+
| [{key=a, value=1},{key=b, value=2},{key=c, value=3}] |
+------------------------------------------------------------+
| [{key=d, value=null},{key=e, value=null},{key=f, value=4}] |
+------------------------------------------------------------+
UDF 是我发现实现此目的的唯一方法。
我正在使用纯 Spark 函数,如果可能的话,我想避免使用 UDF ...
UDF 在 PySpark 上的性能很差,这与在 Scala lang 上使用 Spark 不同。
这是我使用 UDF 的代码:
def parse_query(query):
args = None
if query:
args = []
for arg in query.split("&"):
if arg:
if "=" in arg:
a = arg.split("=")
if a[0]:
v = a[1] if a[1] else None
args.append({"key": a[0], "value": v})
else:
args.append({"key": arg, "value": None})
return args
uri_query = ArrayType(StructType([StructField('key', StringType(), nullable=True),
StructField('value', StringType(), nullable=True)]))
udf_parse_query = udf(lambda args: parse_query(args), uri_query)
df = df.withColumn("cs_uri_query", udf_parse_query(df["cs_uri_query"]))
谁能用惊人的解决方案让我大开眼界?
对于 Spark 2.4+,您可以通过 &
split
然后使用 transform
函数将每个元素 key=value
转换为 struct(key, value)
:
from pyspark.sql.functions import expr
df = spark.createDataFrame([("a=1&b=2&c=3",), ("d&e=&f=4",)], ["cs_uri_query"])
transform_expr = """transform(split(cs_uri_query, '&'),
x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
)
"""
df.withColumn("cs_uri_query", expr(transform_expr)).show(truncate=False)
#+------------------------+
#|cs_uri_query |
#+------------------------+
#|[[a, 1], [b, 2], [c, 3]]|
#|[[d,], [e, ], [f, 4]] |
#+------------------------+
编辑
如果要过滤掉 null 或空的键,则可以将 filter
与上述转换表达式一起使用:
transform_expr = """filter(transform(split(cs_uri_query, '&'),
x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
),
x -> ifnull(x.key, '') <> ''
)
"""
我在 PySpark 中有一个 DataFrame,其中有一列 URI 查询字符串 (StringType),如下所示:
+--------------+
| cs_uri_query |
+--------------+
| a=1&b=2&c=3 |
+--------------+
| d&e=&f=4 |
+--------------+
我需要将此列转换为具有以下结构的 StructField 元素的 ArrayType:
ArrayType(StructType([StructField('key', StringType(), nullable=False),
StructField('value', StringType(), nullable=True)]))
我期望的专栏是这样的:
+------------------------------------------------------------+
| cs_uri_query |
+------------------------------------------------------------+
| [{key=a, value=1},{key=b, value=2},{key=c, value=3}] |
+------------------------------------------------------------+
| [{key=d, value=null},{key=e, value=null},{key=f, value=4}] |
+------------------------------------------------------------+
UDF 是我发现实现此目的的唯一方法。 我正在使用纯 Spark 函数,如果可能的话,我想避免使用 UDF ... UDF 在 PySpark 上的性能很差,这与在 Scala lang 上使用 Spark 不同。
这是我使用 UDF 的代码:
def parse_query(query):
args = None
if query:
args = []
for arg in query.split("&"):
if arg:
if "=" in arg:
a = arg.split("=")
if a[0]:
v = a[1] if a[1] else None
args.append({"key": a[0], "value": v})
else:
args.append({"key": arg, "value": None})
return args
uri_query = ArrayType(StructType([StructField('key', StringType(), nullable=True),
StructField('value', StringType(), nullable=True)]))
udf_parse_query = udf(lambda args: parse_query(args), uri_query)
df = df.withColumn("cs_uri_query", udf_parse_query(df["cs_uri_query"]))
谁能用惊人的解决方案让我大开眼界?
对于 Spark 2.4+,您可以通过 &
split
然后使用 transform
函数将每个元素 key=value
转换为 struct(key, value)
:
from pyspark.sql.functions import expr
df = spark.createDataFrame([("a=1&b=2&c=3",), ("d&e=&f=4",)], ["cs_uri_query"])
transform_expr = """transform(split(cs_uri_query, '&'),
x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
)
"""
df.withColumn("cs_uri_query", expr(transform_expr)).show(truncate=False)
#+------------------------+
#|cs_uri_query |
#+------------------------+
#|[[a, 1], [b, 2], [c, 3]]|
#|[[d,], [e, ], [f, 4]] |
#+------------------------+
编辑
如果要过滤掉 null 或空的键,则可以将 filter
与上述转换表达式一起使用:
transform_expr = """filter(transform(split(cs_uri_query, '&'),
x -> struct(split(x, '=')[0] as key, split(x, '=')[1] as value)
),
x -> ifnull(x.key, '') <> ''
)
"""