使用数组中的 key/value 对访问嵌套数据
Accessing nested data with key/value pairs in array
我有一个带有架构的数据框
root
|-- properties: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: string (nullable = true)
df 中 3 行的示例显示为:
[]
[{"key":"id", "value":"as143"},
{"key":"user", "value":"John Doe"},
{"key":"email", "value":"j.doe@gmail.com"},
{"key":"location", "value":"KY, USA"}]
[{"key":"id", "value":"bd143"},
{"key":"user", "value":"Adam Smith"},
{"key":"email", "value":"a.smith@gmail.com"}]
所以每个新用户都在 df 中的新行中。每行的数据量可以不同,允许空值。
我想创建一个新的 df,这样每个键都是新的列名,列中的值将是该键的所有对应值。例如,我会有一个标题为 'user' 的列,每一行都有不同的用户名。
我尝试使用此代码访问 sub-fields,但出现错误。
keys = table.select('properties.key').distinct().collect()[0][0]
table.withColumn('value', split(table.properties.value, ',')).
select(col('value'[0].alias(keys[0]),
select(col('value'[1].alias(keys[1]),
select(col('value'[2].alias(keys[2]),
select(col('value'[3].alias(keys[3])).display()
我还尝试创建一个键值映射,因为我需要为数据框中的其他列执行这些操作,对于其中的一些列,很难说有多少 key/value 对,因为它们可以是空值,所以我可以使用 pyspark.sql.functions.from_json
.
我觉得这是首选方式,但是,我也没有成功。我无法转换为地图,我认为这是因为我没有 "key":"value"
对,而是在同一行中有 "key": "key 1", "value": "value 1", "key": "key 2", "value": "value 2", "key": "key 3", "value": "value 3"
,等等。
这是我使用的代码:
table.withColumn('properties', from_json(table.properties, MapType(StringType(), StringType())))
我收到错误:
cannot resolve 'entries' due to data type mismatch: argument 1
requires string type, however 'table.properties' is of
array<struct<key:string, value:string>> type.
我不确定如何将这样的数组类型转换为映射。
假设这是您的输入数据框:
data = '[{"properties":[]},{"properties":[{"key":"id","value":"as143"},{"key":"user","value":"John Doe"},{"key":"email","value":"j.doe@gmail.com"},{"key":"location","value":"KY, USA"}]},{"properties":[{"key":"id","value":"bd143"},{"key":"user","value":"Adam Smith"},{"key":"email","value":"a.smith@gmail.com"}]}]'
table = spark.read.json(spark.sparkContext.parallelize([data]))
您的列 properties
是结构数组类型,这就是您在尝试使用 from_json
函数时收到数据不匹配错误的原因。您可以简单地展开数组以获得 2 列 key
和 value
,如下所示:
table.selectExpr("inline(properties)").show()
#+--------+-----------------+
#| key| value|
#+--------+-----------------+
#| id| as143|
#| user| John Doe|
#| email| j.doe@gmail.com|
#|location| KY, USA|
#| id| bd143|
#| user| Adam Smith|
#| email|a.smith@gmail.com|
#+--------+-----------------+
也就是说,现在您想要的是 pivot
列 key
。为了使用 pivot,我们需要一些列 ID
来分组,如果你没有这个列,你可以在分解数组之前使用 monotonically_increasing_id
函数分配一个 row_id
:
from pyspark.sql import functions as F
result = table.withColumn("row_id", F.monotonically_increasing_id()) \
.selectExpr("row_id", "inline(properties)") \
.groupBy("row_id").pivot("key").agg(F.first("value")).drop("row_id")
result.show()
#+-----------------+-----+--------+----------+
#| email| id|location| user|
#+-----------------+-----+--------+----------+
#| j.doe@gmail.com|as143| KY, USA| John Doe|
#|a.smith@gmail.com|bd143| null|Adam Smith|
#+-----------------+-----+--------+----------+
我有一个带有架构的数据框
root
|-- properties: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: string (nullable = true)
df 中 3 行的示例显示为:
[]
[{"key":"id", "value":"as143"},
{"key":"user", "value":"John Doe"},
{"key":"email", "value":"j.doe@gmail.com"},
{"key":"location", "value":"KY, USA"}]
[{"key":"id", "value":"bd143"},
{"key":"user", "value":"Adam Smith"},
{"key":"email", "value":"a.smith@gmail.com"}]
所以每个新用户都在 df 中的新行中。每行的数据量可以不同,允许空值。
我想创建一个新的 df,这样每个键都是新的列名,列中的值将是该键的所有对应值。例如,我会有一个标题为 'user' 的列,每一行都有不同的用户名。
我尝试使用此代码访问 sub-fields,但出现错误。
keys = table.select('properties.key').distinct().collect()[0][0]
table.withColumn('value', split(table.properties.value, ',')).
select(col('value'[0].alias(keys[0]),
select(col('value'[1].alias(keys[1]),
select(col('value'[2].alias(keys[2]),
select(col('value'[3].alias(keys[3])).display()
我还尝试创建一个键值映射,因为我需要为数据框中的其他列执行这些操作,对于其中的一些列,很难说有多少 key/value 对,因为它们可以是空值,所以我可以使用 pyspark.sql.functions.from_json
.
我觉得这是首选方式,但是,我也没有成功。我无法转换为地图,我认为这是因为我没有 "key":"value"
对,而是在同一行中有 "key": "key 1", "value": "value 1", "key": "key 2", "value": "value 2", "key": "key 3", "value": "value 3"
,等等。
这是我使用的代码:
table.withColumn('properties', from_json(table.properties, MapType(StringType(), StringType())))
我收到错误:
cannot resolve 'entries' due to data type mismatch: argument 1 requires string type, however 'table.properties' is of array<struct<key:string, value:string>> type.
我不确定如何将这样的数组类型转换为映射。
假设这是您的输入数据框:
data = '[{"properties":[]},{"properties":[{"key":"id","value":"as143"},{"key":"user","value":"John Doe"},{"key":"email","value":"j.doe@gmail.com"},{"key":"location","value":"KY, USA"}]},{"properties":[{"key":"id","value":"bd143"},{"key":"user","value":"Adam Smith"},{"key":"email","value":"a.smith@gmail.com"}]}]'
table = spark.read.json(spark.sparkContext.parallelize([data]))
您的列 properties
是结构数组类型,这就是您在尝试使用 from_json
函数时收到数据不匹配错误的原因。您可以简单地展开数组以获得 2 列 key
和 value
,如下所示:
table.selectExpr("inline(properties)").show()
#+--------+-----------------+
#| key| value|
#+--------+-----------------+
#| id| as143|
#| user| John Doe|
#| email| j.doe@gmail.com|
#|location| KY, USA|
#| id| bd143|
#| user| Adam Smith|
#| email|a.smith@gmail.com|
#+--------+-----------------+
也就是说,现在您想要的是 pivot
列 key
。为了使用 pivot,我们需要一些列 ID
来分组,如果你没有这个列,你可以在分解数组之前使用 monotonically_increasing_id
函数分配一个 row_id
:
from pyspark.sql import functions as F
result = table.withColumn("row_id", F.monotonically_increasing_id()) \
.selectExpr("row_id", "inline(properties)") \
.groupBy("row_id").pivot("key").agg(F.first("value")).drop("row_id")
result.show()
#+-----------------+-----+--------+----------+
#| email| id|location| user|
#+-----------------+-----+--------+----------+
#| j.doe@gmail.com|as143| KY, USA| John Doe|
#|a.smith@gmail.com|bd143| null|Adam Smith|
#+-----------------+-----+--------+----------+