使用 pyspark 按键合并 json 个文件

Merge json files by key using pyspark

Json 文件格式如下:-

**Input-** 

{'key-a' : [{'key1':'value1', 'key2':'value2'},{'key1':'value3', 'key2':'value4'}...], 
'key-b':'value-b', 
'key-c':'value-c'},
{'key-a' : [{'key1':'value5', 'key2':'value6'},{'key1':'value7', 'key2':'value8'}...], 
'key-b':'value-b', 
'key-c':'value-c'}

我需要合并数据以将 'key-a' 和 return 的所有值合并为一个 json 对象作为输出:

**Output-** 
{'key-a' : 
[{'key1':'value1', 'key2':'value2'},
{'key1':'value3', 'key2':'value4'},
{'key1':'value5', 'key2':'value6'},
{'key1':'value7', 'key2':'value8'}...], 
'key-b':'value-b', 
'key-c':'value-c'}

数据加载到具有以下架构的 pyspark 数据框中:-

**Schema:**

key-a
|-- key1: string (nullable= false)
|-- key2: string (nullable= true)
key-b: string (nullable= true)
key-c: string (nullable= false)

我曾尝试使用 groupbykey 函数,但是当我尝试 Show() 输出时,出现以下错误:“groupeddata 对象没有属性 'show' pyspark".

如何实现上述改造?

PFA- Error received when trying below answer

这可能是适合您的解决方案 -

# 在这里创建数据框

df_new = spark.createDataFrame([(str({"key-a":[{"key1":"value1","key2":"value2"}, {"key1": "value3", "key2": "value4"}], "key-b" :"value-b"})), (str({"key-a":[{"key1":"value5","key2":"value6"}, {"key1": "value7", "key2": "value8"}], "key-b" :"value-b"}))],T.StringType())
df_new.show(truncate=False)
+-----------------------------------------------------------------------------------------------------------+
|value                                                                                                      |
+-----------------------------------------------------------------------------------------------------------+
|{'key-a': [{'key1': 'value1', 'key2': 'value2'}, {'key1': 'value3', 'key2': 'value4'}], 'key-b': 'value-b'}|
|{'key-a': [{'key1': 'value5', 'key2': 'value6'}, {'key1': 'value7', 'key2': 'value8'}], 'key-b': 'value-b'}|
+-----------------------------------------------------------------------------------------------------------+

使用 from_json 和正确的模式首先评估列 - 这里的思路是获取某列中json的key,然后使用groupBy

df = df_new.withColumn('col', F.from_json("value",T.MapType(T.StringType(), T.StringType())))
df = df.select("col", F.explode("col").alias("x", "y"))
df.select("x", "y").show(truncate=False)
+-----+---------------------------------------------------------------------+
|x    |y                                                                    |
+-----+---------------------------------------------------------------------+
|key-a|[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]|
|key-b|value-b                                                              |
|key-a|[{"key1":"value5","key2":"value6"},{"key1":"value7","key2":"value8"}]|
|key-b|value-b                                                              |
+-----+---------------------------------------------------------------------+

逻辑在这里- 为了分组,我们创建了一个虚拟列

df_grp = df.groupBy("x").agg(F.collect_set("y").alias("y"))
df_grp = df_grp.withColumn("y", F.col("y").cast(T.StringType()))
df_grp = df_grp.withColumn("array", F.array("x", "y"))
df_grp = df_grp.withColumn("dummy_col", F.lit("1"))
df_grp = df_grp.groupBy("dummy_col").agg(F.collect_set("array"))
df_grp.show(truncate=False)

+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dummy_col|collect_set(array)                                                                                                                                                           |
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1        |[[key-a, [[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}], [{"key1":"value5","key2":"value6"},{"key1":"value7","key2":"value8"}]]], [key-b, [value-b]]]|
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I have tried using the groupbykey function but when I try to Show() the output, I get the following error: "groupeddata object has no attribute 'show' pyspark".

这会带来麻烦,因为您没有在 groupBy 子句中使用任何聚合函数.