PySpark 解析嵌套结构数组
PySpark Parsing nested array of struct
我想从 PySpark SQL 数据帧中解析并获取特定键的值,格式如下
我可以使用 UDF 实现此目的,但处理 40 列且 JSON 大小为 100MB 需要将近 20 分钟。也尝试过爆炸,但它为每个数组元素提供了单独的行。但我只需要给定结构数组中键的特定值。
格式
array<struct<key:string,value:struct<int_value:string,string_value:string>>>
获取特定键值的函数
def getValueFunc(searcharray, searchkey):
for val in searcharray:
if val["key"] == searchkey:
if val["value"]["string_value"] is not None:
actual = val["value"]["string_value"]
return actual
elif val["value"]["int_value"] is not None:
actual = val["value"]["int_value"]
return str(actual)
else:
return "---"
.....
getValue = udf(getValueFunc, StringType())
....
# register the name rank udf template
spark.udf.register("getValue", getValue)
.....
df.select(getValue(col("event_params"), lit("category")).alias("event_category"))
对于 Spark 2.40+,您可以使用 SparkSQL 的 filter() 函数找到匹配 key == serarchkey
的第一个数组元素,然后检索它的值。下面是一个 Spark SQL 片段模板(searchkey 作为变量)来完成上面提到的第一部分。
stmt = '''filter(event_params, x -> x.key == "{}")[0]'''.format(searchkey)
运行上面的stmt加上expr()
函数,并将值(StructType)赋值给一个临时列 f1
,然后使用 coalesce()
函数检索非空值。
from pyspark.sql.functions import expr
df.withColumn('f1', expr(stmt)) \
.selectExpr("coalesce(f1.value.string_value, string(f1.value.int_value),'---') AS event_category") \
.show()
如果您对上面的代码有任何问题,请告诉我运行。
我想从 PySpark SQL 数据帧中解析并获取特定键的值,格式如下
我可以使用 UDF 实现此目的,但处理 40 列且 JSON 大小为 100MB 需要将近 20 分钟。也尝试过爆炸,但它为每个数组元素提供了单独的行。但我只需要给定结构数组中键的特定值。
格式
array<struct<key:string,value:struct<int_value:string,string_value:string>>>
获取特定键值的函数
def getValueFunc(searcharray, searchkey):
for val in searcharray:
if val["key"] == searchkey:
if val["value"]["string_value"] is not None:
actual = val["value"]["string_value"]
return actual
elif val["value"]["int_value"] is not None:
actual = val["value"]["int_value"]
return str(actual)
else:
return "---"
.....
getValue = udf(getValueFunc, StringType())
....
# register the name rank udf template
spark.udf.register("getValue", getValue)
.....
df.select(getValue(col("event_params"), lit("category")).alias("event_category"))
对于 Spark 2.40+,您可以使用 SparkSQL 的 filter() 函数找到匹配 key == serarchkey
的第一个数组元素,然后检索它的值。下面是一个 Spark SQL 片段模板(searchkey 作为变量)来完成上面提到的第一部分。
stmt = '''filter(event_params, x -> x.key == "{}")[0]'''.format(searchkey)
运行上面的stmt加上expr()
函数,并将值(StructType)赋值给一个临时列 f1
,然后使用 coalesce()
函数检索非空值。
from pyspark.sql.functions import expr
df.withColumn('f1', expr(stmt)) \
.selectExpr("coalesce(f1.value.string_value, string(f1.value.int_value),'---') AS event_category") \
.show()
如果您对上面的代码有任何问题,请告诉我运行。