使用 pyspark 按键合并 json 个文件
Merge json files by key using pyspark
Json 文件格式如下:-
**Input-**
{'key-a' : [{'key1':'value1', 'key2':'value2'},{'key1':'value3', 'key2':'value4'}...],
'key-b':'value-b',
'key-c':'value-c'},
{'key-a' : [{'key1':'value5', 'key2':'value6'},{'key1':'value7', 'key2':'value8'}...],
'key-b':'value-b',
'key-c':'value-c'}
我需要合并数据以将 'key-a' 和 return 的所有值合并为一个 json 对象作为输出:
**Output-**
{'key-a' :
[{'key1':'value1', 'key2':'value2'},
{'key1':'value3', 'key2':'value4'},
{'key1':'value5', 'key2':'value6'},
{'key1':'value7', 'key2':'value8'}...],
'key-b':'value-b',
'key-c':'value-c'}
数据加载到具有以下架构的 pyspark 数据框中:-
**Schema:**
key-a
|-- key1: string (nullable= false)
|-- key2: string (nullable= true)
key-b: string (nullable= true)
key-c: string (nullable= false)
我曾尝试使用 groupbykey
函数,但是当我尝试 Show()
输出时,出现以下错误:“groupeddata
对象没有属性 'show' pyspark".
如何实现上述改造?
PFA- Error received when trying below answer
这可能是适合您的解决方案 -
# 在这里创建数据框
df_new = spark.createDataFrame([(str({"key-a":[{"key1":"value1","key2":"value2"}, {"key1": "value3", "key2": "value4"}], "key-b" :"value-b"})), (str({"key-a":[{"key1":"value5","key2":"value6"}, {"key1": "value7", "key2": "value8"}], "key-b" :"value-b"}))],T.StringType())
df_new.show(truncate=False)
+-----------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------+
|{'key-a': [{'key1': 'value1', 'key2': 'value2'}, {'key1': 'value3', 'key2': 'value4'}], 'key-b': 'value-b'}|
|{'key-a': [{'key1': 'value5', 'key2': 'value6'}, {'key1': 'value7', 'key2': 'value8'}], 'key-b': 'value-b'}|
+-----------------------------------------------------------------------------------------------------------+
使用 from_json
和正确的模式首先评估列 -
这里的思路是获取某列中json的key,然后使用groupBy
df = df_new.withColumn('col', F.from_json("value",T.MapType(T.StringType(), T.StringType())))
df = df.select("col", F.explode("col").alias("x", "y"))
df.select("x", "y").show(truncate=False)
+-----+---------------------------------------------------------------------+
|x |y |
+-----+---------------------------------------------------------------------+
|key-a|[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]|
|key-b|value-b |
|key-a|[{"key1":"value5","key2":"value6"},{"key1":"value7","key2":"value8"}]|
|key-b|value-b |
+-----+---------------------------------------------------------------------+
逻辑在这里-
为了分组,我们创建了一个虚拟列
df_grp = df.groupBy("x").agg(F.collect_set("y").alias("y"))
df_grp = df_grp.withColumn("y", F.col("y").cast(T.StringType()))
df_grp = df_grp.withColumn("array", F.array("x", "y"))
df_grp = df_grp.withColumn("dummy_col", F.lit("1"))
df_grp = df_grp.groupBy("dummy_col").agg(F.collect_set("array"))
df_grp.show(truncate=False)
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dummy_col|collect_set(array) |
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |[[key-a, [[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}], [{"key1":"value5","key2":"value6"},{"key1":"value7","key2":"value8"}]]], [key-b, [value-b]]]|
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I have tried using the groupbykey function but when I try to Show() the output, I get the following error: "groupeddata object has no attribute 'show' pyspark".
这会带来麻烦,因为您没有在 groupBy
子句中使用任何聚合函数.
Json 文件格式如下:-
**Input-**
{'key-a' : [{'key1':'value1', 'key2':'value2'},{'key1':'value3', 'key2':'value4'}...],
'key-b':'value-b',
'key-c':'value-c'},
{'key-a' : [{'key1':'value5', 'key2':'value6'},{'key1':'value7', 'key2':'value8'}...],
'key-b':'value-b',
'key-c':'value-c'}
我需要合并数据以将 'key-a' 和 return 的所有值合并为一个 json 对象作为输出:
**Output-**
{'key-a' :
[{'key1':'value1', 'key2':'value2'},
{'key1':'value3', 'key2':'value4'},
{'key1':'value5', 'key2':'value6'},
{'key1':'value7', 'key2':'value8'}...],
'key-b':'value-b',
'key-c':'value-c'}
数据加载到具有以下架构的 pyspark 数据框中:-
**Schema:**
key-a
|-- key1: string (nullable= false)
|-- key2: string (nullable= true)
key-b: string (nullable= true)
key-c: string (nullable= false)
我曾尝试使用 groupbykey
函数,但是当我尝试 Show()
输出时,出现以下错误:“groupeddata
对象没有属性 'show' pyspark".
如何实现上述改造?
PFA- Error received when trying below answer
这可能是适合您的解决方案 -
# 在这里创建数据框
df_new = spark.createDataFrame([(str({"key-a":[{"key1":"value1","key2":"value2"}, {"key1": "value3", "key2": "value4"}], "key-b" :"value-b"})), (str({"key-a":[{"key1":"value5","key2":"value6"}, {"key1": "value7", "key2": "value8"}], "key-b" :"value-b"}))],T.StringType())
df_new.show(truncate=False)
+-----------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------+
|{'key-a': [{'key1': 'value1', 'key2': 'value2'}, {'key1': 'value3', 'key2': 'value4'}], 'key-b': 'value-b'}|
|{'key-a': [{'key1': 'value5', 'key2': 'value6'}, {'key1': 'value7', 'key2': 'value8'}], 'key-b': 'value-b'}|
+-----------------------------------------------------------------------------------------------------------+
使用 from_json
和正确的模式首先评估列 -
这里的思路是获取某列中json的key,然后使用groupBy
df = df_new.withColumn('col', F.from_json("value",T.MapType(T.StringType(), T.StringType())))
df = df.select("col", F.explode("col").alias("x", "y"))
df.select("x", "y").show(truncate=False)
+-----+---------------------------------------------------------------------+
|x |y |
+-----+---------------------------------------------------------------------+
|key-a|[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]|
|key-b|value-b |
|key-a|[{"key1":"value5","key2":"value6"},{"key1":"value7","key2":"value8"}]|
|key-b|value-b |
+-----+---------------------------------------------------------------------+
逻辑在这里- 为了分组,我们创建了一个虚拟列
df_grp = df.groupBy("x").agg(F.collect_set("y").alias("y"))
df_grp = df_grp.withColumn("y", F.col("y").cast(T.StringType()))
df_grp = df_grp.withColumn("array", F.array("x", "y"))
df_grp = df_grp.withColumn("dummy_col", F.lit("1"))
df_grp = df_grp.groupBy("dummy_col").agg(F.collect_set("array"))
df_grp.show(truncate=False)
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dummy_col|collect_set(array) |
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |[[key-a, [[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}], [{"key1":"value5","key2":"value6"},{"key1":"value7","key2":"value8"}]]], [key-b, [value-b]]]|
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I have tried using the groupbykey function but when I try to Show() the output, I get the following error: "groupeddata object has no attribute 'show' pyspark".
这会带来麻烦,因为您没有在 groupBy
子句中使用任何聚合函数.