PySpark 解析嵌套结构数组

PySpark Parsing nested array of struct

我想从 PySpark SQL 数据帧中解析并获取特定键的值,格式如下

我可以使用 UDF 实现此目的,但处理 40 列且 JSON 大小为 100MB 需要将近 20 分钟。也尝试过爆炸,但它为每个数组元素提供了单独的行。但我只需要给定结构数组中键的特定值。

格式

array<struct<key:string,value:struct<int_value:string,string_value:string>>>

获取特定键值的函数

def getValueFunc(searcharray, searchkey):
    for val in searcharray:
        if val["key"] == searchkey:
            if val["value"]["string_value"] is not None:
                actual = val["value"]["string_value"]
                return actual
            elif val["value"]["int_value"] is not None:
                actual = val["value"]["int_value"]
                return str(actual)
            else:
                return "---"

.....
getValue = udf(getValueFunc, StringType())
....
# register the name rank udf template
spark.udf.register("getValue", getValue)
.....
df.select(getValue(col("event_params"), lit("category")).alias("event_category"))

对于 Spark 2.40+,您可以使用 SparkSQL 的 filter() 函数找到匹配 key == serarchkey 的第一个数组元素,然后检索它的值。下面是一个 Spark SQL 片段模板(searchkey 作为变量)来完成上面提到的第一部分。

stmt = '''filter(event_params, x -> x.key == "{}")[0]'''.format(searchkey)

运行上面的stmt加上expr()函数,并将值(StructType)赋值给一个临时列 f1,然后使用 coalesce() 函数检索非空值。

from pyspark.sql.functions import expr

df.withColumn('f1', expr(stmt)) \
    .selectExpr("coalesce(f1.value.string_value, string(f1.value.int_value),'---') AS event_category") \
    .show()

如果您对上面的代码有任何问题,请告诉我运行。