Pyspark 将列表转换为特定列中的字典

Pyspark turn a list to a dictionary in a specific column

我在 json;

中有一个看起来像这样的 spark 数据框
{
  "site_id": "ABC",
  "region": "Texas",
  "areas": [
    {
      "Carbon": [
        "ABB",
        "ABD",
        "ABE"
      ]
    }
  ],
  "site_name": "ABC"
}

而且我需要将“区域”列改为此列;

"areas": 
  [
    {
        "area_name": "Carbon",
        "pls": [
         {
             "pl_name": "ABB"
         },
         {
             "pl_name": "ABD"
         },  
         {
             "pl_name": "ABE"
         }
       ]
    }

]

我已经执行了 df.collect() 并直接操作了字典,但这增加了一些复杂性。有没有办法直接在数据框本身中执行此操作?

编辑: 这是输入模式

|-- site_id: string
|-- region: string
|-- site_name: string
|-- areas: array
|    |-- element: map
|    |    |-- keyType: string
|    |    |-- valueType: array
|    |    |    |-- element: string

在输出模式中,目标是让 valueType 也是一个字典。我实际上将数据保存到 dynamodb table 所以输出应该像我从 table.

扫描时提供的示例

加工和制作JSON据我了解不是Spark的强项。最简单的方法(不是展平然后分组然后收集然后旋转等)是使用 UDF。我完全理解 UDF 不如内置的 Spark 转换快,但如果你的数据规模不是那么大,那应该不是问题。

def transform_json(arr):
    r = []
    for e in arr:
        for k in e.keys():
            r.append({
                'area_name': k,
                'pls': [{'pl_name': i} for i in e[k]]
            })
    return r

(df
    .withColumn('areas', F.udf(
        transform_json,
        T.ArrayType(T.StructType([
            T.StructField('area_name', T.StringType()),
            T.StructField('pls', T.ArrayType(T.StructType([
                T.StructField('pl_name', T.StringType())
            ]))),
        ])
        ))('areas')
    )
    .show(10, False)
)

# Output
# +------------------------------------------------------------------+
# |areas                                                             |
# +------------------------------------------------------------------+
# |[{Carbon, [{ABB}, {ABD}, {ABE}]}, {Oxygen, [{ABB}, {ABD}, {ABE}]}]|
# +------------------------------------------------------------------+

# Schema
# root
#  |-- areas: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- area_name: string (nullable = true)
#  |    |    |-- pls: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- pl_name: string (nullable = true)