Pyspark 将列表转换为特定列中的字典
Pyspark turn a list to a dictionary in a specific column
我在 json;
中有一个看起来像这样的 spark 数据框
{
"site_id": "ABC",
"region": "Texas",
"areas": [
{
"Carbon": [
"ABB",
"ABD",
"ABE"
]
}
],
"site_name": "ABC"
}
而且我需要将“区域”列改为此列;
"areas":
[
{
"area_name": "Carbon",
"pls": [
{
"pl_name": "ABB"
},
{
"pl_name": "ABD"
},
{
"pl_name": "ABE"
}
]
}
]
我已经执行了 df.collect() 并直接操作了字典,但这增加了一些复杂性。有没有办法直接在数据框本身中执行此操作?
编辑:
这是输入模式
|-- site_id: string
|-- region: string
|-- site_name: string
|-- areas: array
| |-- element: map
| | |-- keyType: string
| | |-- valueType: array
| | | |-- element: string
在输出模式中,目标是让 valueType 也是一个字典。我实际上将数据保存到 dynamodb table 所以输出应该像我从 table.
扫描时提供的示例
加工和制作JSON据我了解不是Spark的强项。最简单的方法(不是展平然后分组然后收集然后旋转等)是使用 UDF。我完全理解 UDF 不如内置的 Spark 转换快,但如果你的数据规模不是那么大,那应该不是问题。
def transform_json(arr):
r = []
for e in arr:
for k in e.keys():
r.append({
'area_name': k,
'pls': [{'pl_name': i} for i in e[k]]
})
return r
(df
.withColumn('areas', F.udf(
transform_json,
T.ArrayType(T.StructType([
T.StructField('area_name', T.StringType()),
T.StructField('pls', T.ArrayType(T.StructType([
T.StructField('pl_name', T.StringType())
]))),
])
))('areas')
)
.show(10, False)
)
# Output
# +------------------------------------------------------------------+
# |areas |
# +------------------------------------------------------------------+
# |[{Carbon, [{ABB}, {ABD}, {ABE}]}, {Oxygen, [{ABB}, {ABD}, {ABE}]}]|
# +------------------------------------------------------------------+
# Schema
# root
# |-- areas: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- area_name: string (nullable = true)
# | | |-- pls: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- pl_name: string (nullable = true)
我在 json;
中有一个看起来像这样的 spark 数据框{
"site_id": "ABC",
"region": "Texas",
"areas": [
{
"Carbon": [
"ABB",
"ABD",
"ABE"
]
}
],
"site_name": "ABC"
}
而且我需要将“区域”列改为此列;
"areas":
[
{
"area_name": "Carbon",
"pls": [
{
"pl_name": "ABB"
},
{
"pl_name": "ABD"
},
{
"pl_name": "ABE"
}
]
}
]
我已经执行了 df.collect() 并直接操作了字典,但这增加了一些复杂性。有没有办法直接在数据框本身中执行此操作?
编辑: 这是输入模式
|-- site_id: string
|-- region: string
|-- site_name: string
|-- areas: array
| |-- element: map
| | |-- keyType: string
| | |-- valueType: array
| | | |-- element: string
在输出模式中,目标是让 valueType 也是一个字典。我实际上将数据保存到 dynamodb table 所以输出应该像我从 table.
扫描时提供的示例加工和制作JSON据我了解不是Spark的强项。最简单的方法(不是展平然后分组然后收集然后旋转等)是使用 UDF。我完全理解 UDF 不如内置的 Spark 转换快,但如果你的数据规模不是那么大,那应该不是问题。
def transform_json(arr):
r = []
for e in arr:
for k in e.keys():
r.append({
'area_name': k,
'pls': [{'pl_name': i} for i in e[k]]
})
return r
(df
.withColumn('areas', F.udf(
transform_json,
T.ArrayType(T.StructType([
T.StructField('area_name', T.StringType()),
T.StructField('pls', T.ArrayType(T.StructType([
T.StructField('pl_name', T.StringType())
]))),
])
))('areas')
)
.show(10, False)
)
# Output
# +------------------------------------------------------------------+
# |areas |
# +------------------------------------------------------------------+
# |[{Carbon, [{ABB}, {ABD}, {ABE}]}, {Oxygen, [{ABB}, {ABD}, {ABE}]}]|
# +------------------------------------------------------------------+
# Schema
# root
# |-- areas: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- area_name: string (nullable = true)
# | | |-- pls: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- pl_name: string (nullable = true)