使用 Python 将嵌套 Json 更新为另一个嵌套 Json
Update the Nested Json with another Nested Json using Python
例如,我有一整套嵌套 JSON,我需要用另一个嵌套 JSON.
的最新值更新此 JSON
谁能帮我解决这个问题?
我想在 Pyspark 中实现它。
全套Json看起来像这样:
{
"email": "abctest@xxx.com",
"firstName": "name01",
"id": 6304,
"surname": "Optional",
"layer01": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"layer02": {
"key1": "value1",
"key2": "value2"
},
"layer03": [
{
"inner_key01": "inner value01"
},
{
"inner_key02": "inner_value02"
}
]
},
"surname": "Required only$uid"
}
最新的Json看起来像这样:
{
"email": "test@xxx.com",
"firstName": "name01",
"surname": "Optional",
"id": 6304,
"layer01": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"layer02": {
"key1": "value1_changedData",
"key2": "value2"
},
"layer03": [
{
"inner_key01": "inner value01"
},
{
"inner_key02": "inner_value02"
}
]
},
"surname": "Required only$uid"
}
在上面的 id=6304
中,我们收到了 layer01.layer02.key1
和 emailaddress
字段的更新。
所以我需要将这些值更新为完整 JSON,请帮助我。
您可以将 2 个 JSON 文件加载到 Spark 数据帧中并执行 left_join
从最新的 JSON 数据中获取更新:
from pyspark.sql import functions as F
full_json_df = spark.read.json(full_json_path, multiLine=True)
latest_json_df = spark.read.json(latest_json_path, multiLine=True)
updated_df = full_json_df.alias("full").join(
latest_json_df.alias("latest"),
F.col("full.id") == F.col("latest.id"),
"left"
).select(
F.col("full.id"),
*[
F.when(F.col("latest.id").isNotNull(), F.col(f"latest.{c}")).otherwise(F.col(f"full.{c}")).alias(c)
for c in full_json_df.columns if c != 'id'
]
)
updated_df.show(truncate=False)
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|id |email |firstName|layer01 |surname |
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|6304|test@xxx.com|name01 |[value1, value2, value3, value4, [value1_changedData, value2], [[inner value01,], [, inner_value02]]]|Optional|
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
更新:
如果架构在 完整 和 最新 JSON 之间发生变化,您可以将这两个文件加载到相同的数据中框架(通过这种方式合并模式),然后根据 id
:
进行重复数据删除
from pyspark.sql import Window
from pyspark.sql import functions as F
merged_json_df = spark.read.json("/path/to/{full_json.json,latest_json.json}", multiLine=True)
# order priority: latest file then full
w = Window.partitionBy(F.col("id")).orderBy(F.when(F.input_file_name().like('%latest%'), 0).otherwise(1))
updated_df = merged_json_df.withColumn("rn", F.row_number().over(w))\
.filter("rn = 1")\
.drop("rn")
updated_df.show(truncate=False)
例如,我有一整套嵌套 JSON,我需要用另一个嵌套 JSON.
的最新值更新此 JSON谁能帮我解决这个问题?
我想在 Pyspark 中实现它。
全套Json看起来像这样:
{
"email": "abctest@xxx.com",
"firstName": "name01",
"id": 6304,
"surname": "Optional",
"layer01": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"layer02": {
"key1": "value1",
"key2": "value2"
},
"layer03": [
{
"inner_key01": "inner value01"
},
{
"inner_key02": "inner_value02"
}
]
},
"surname": "Required only$uid"
}
最新的Json看起来像这样:
{
"email": "test@xxx.com",
"firstName": "name01",
"surname": "Optional",
"id": 6304,
"layer01": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
"layer02": {
"key1": "value1_changedData",
"key2": "value2"
},
"layer03": [
{
"inner_key01": "inner value01"
},
{
"inner_key02": "inner_value02"
}
]
},
"surname": "Required only$uid"
}
在上面的 id=6304
中,我们收到了 layer01.layer02.key1
和 emailaddress
字段的更新。
所以我需要将这些值更新为完整 JSON,请帮助我。
您可以将 2 个 JSON 文件加载到 Spark 数据帧中并执行 left_join
从最新的 JSON 数据中获取更新:
from pyspark.sql import functions as F
full_json_df = spark.read.json(full_json_path, multiLine=True)
latest_json_df = spark.read.json(latest_json_path, multiLine=True)
updated_df = full_json_df.alias("full").join(
latest_json_df.alias("latest"),
F.col("full.id") == F.col("latest.id"),
"left"
).select(
F.col("full.id"),
*[
F.when(F.col("latest.id").isNotNull(), F.col(f"latest.{c}")).otherwise(F.col(f"full.{c}")).alias(c)
for c in full_json_df.columns if c != 'id'
]
)
updated_df.show(truncate=False)
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|id |email |firstName|layer01 |surname |
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
#|6304|test@xxx.com|name01 |[value1, value2, value3, value4, [value1_changedData, value2], [[inner value01,], [, inner_value02]]]|Optional|
#+----+------------+---------+-----------------------------------------------------------------------------------------------------+--------+
更新:
如果架构在 完整 和 最新 JSON 之间发生变化,您可以将这两个文件加载到相同的数据中框架(通过这种方式合并模式),然后根据 id
:
from pyspark.sql import Window
from pyspark.sql import functions as F
merged_json_df = spark.read.json("/path/to/{full_json.json,latest_json.json}", multiLine=True)
# order priority: latest file then full
w = Window.partitionBy(F.col("id")).orderBy(F.when(F.input_file_name().like('%latest%'), 0).otherwise(1))
updated_df = merged_json_df.withColumn("rn", F.row_number().over(w))\
.filter("rn = 1")\
.drop("rn")
updated_df.show(truncate=False)