使用数组中的 key/value 对访问嵌套数据

Accessing nested data with key/value pairs in array

我有一个带有架构的数据框

root
|-- properties: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- value: string (nullable = true)

df 中 3 行的示例显示为:

[]

[{"key":"id", "value":"as143"},
  {"key":"user", "value":"John Doe"},
  {"key":"email", "value":"j.doe@gmail.com"},
  {"key":"location", "value":"KY, USA"}]

[{"key":"id", "value":"bd143"},
  {"key":"user", "value":"Adam Smith"},
  {"key":"email", "value":"a.smith@gmail.com"}]

所以每个新用户都在 df 中的新行中。每行的数据量可以不同,允许空值。

我想创建一个新的 df,这样每个键都是新的列名,列中的值将是该键的所有对应值。例如,我会有一个标题为 'user' 的列,每一行都有不同的用户名。

我尝试使用此代码访问 sub-fields,但出现错误。

keys = table.select('properties.key').distinct().collect()[0][0]
table.withColumn('value', split(table.properties.value, ',')).
    select(col('value'[0].alias(keys[0]),
    select(col('value'[1].alias(keys[1]),
    select(col('value'[2].alias(keys[2]),
    select(col('value'[3].alias(keys[3])).display()

我还尝试创建一个键值映射,因为我需要为数据框中的其他列执行这些操作,对于其中的一些列,很难说有多少 key/value 对,因为它们可以是空值,所以我可以使用 pyspark.sql.functions.from_json.

我觉得这是首选方式,但是,我也没有成功。我无法转换为地图,我认为这是因为我没有 "key":"value" 对,而是在同一行中有 "key": "key 1", "value": "value 1", "key": "key 2", "value": "value 2", "key": "key 3", "value": "value 3",等等。

这是我使用的代码:

table.withColumn('properties', from_json(table.properties, MapType(StringType(), StringType())))

我收到错误:

cannot resolve 'entries' due to data type mismatch: argument 1 requires string type, however 'table.properties' is of array<struct<key:string, value:string>> type.

我不确定如何将这样的数组类型转换为映射。

假设这是您的输入数据框:

data = '[{"properties":[]},{"properties":[{"key":"id","value":"as143"},{"key":"user","value":"John Doe"},{"key":"email","value":"j.doe@gmail.com"},{"key":"location","value":"KY, USA"}]},{"properties":[{"key":"id","value":"bd143"},{"key":"user","value":"Adam Smith"},{"key":"email","value":"a.smith@gmail.com"}]}]'
table = spark.read.json(spark.sparkContext.parallelize([data]))

您的列 properties 是结构数组类型,这就是您在尝试使用 from_json 函数时收到数据不匹配错误的原因。您可以简单地展开数组以获得 2 列 keyvalue,如下所示:

table.selectExpr("inline(properties)").show()

#+--------+-----------------+
#|     key|            value|
#+--------+-----------------+
#|      id|            as143|
#|    user|         John Doe|
#|   email|  j.doe@gmail.com|
#|location|          KY, USA|
#|      id|            bd143|
#|    user|       Adam Smith|
#|   email|a.smith@gmail.com|
#+--------+-----------------+

也就是说,现在您想要的是 pivotkey。为了使用 pivot,我们需要一些列 ID 来分组,如果你没有这个列,你可以在分解数组之前使用 monotonically_increasing_id 函数分配一个 row_id

from pyspark.sql import functions as F

result = table.withColumn("row_id", F.monotonically_increasing_id()) \
    .selectExpr("row_id", "inline(properties)") \
    .groupBy("row_id").pivot("key").agg(F.first("value")).drop("row_id")

result.show()

#+-----------------+-----+--------+----------+
#|            email|   id|location|      user|
#+-----------------+-----+--------+----------+
#|  j.doe@gmail.com|as143| KY, USA|  John Doe|
#|a.smith@gmail.com|bd143|    null|Adam Smith|
#+-----------------+-----+--------+----------+