将列中的字符串作为嵌套 JSON 存储到 JSON 文件 - Pyspark
Store string in a column as nested JSON to a JSON file - Pyspark
我有一个 pyspark 数据框,这是它的样子
+------------------------------------+-------------------+-------------+--------------------------------+---------+
|member_uuid |Timestamp |updated |member_id |easy_id |
+------------------------------------+-------------------+-------------+--------------------------------+---------+
|027130fe-584d-4d8e-9fb0-b87c984a0c20|2020-02-11 19:15:32|password_hash|ajuypjtnlzmk4na047cgav27jma6_STG|993269700|
我把上面的dataframe改成了这个,
+---------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|attribute|operation|params |timestamp |
+---------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|profile |UPDATE |{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":993269700,"field":"password_hash"}|2020-02-11 19:15:32|
使用下面的代码,
ll = ['member_uuid', 'member_id', 'easy_id', 'field']
df = df.withColumn('timestamp', col('Timestamp')).withColumn('attribute', lit('profile')).withColumn('operation', lit(col_name)) \
.withColumn('field', col('updated')).withColumn('params', F.to_json(struct([x for x in ll])))
df = df.select('attribute', 'operation', 'params', 'timestamp')
我已将此数据帧 df 转换为 JSON 后保存到文本文件中。
我尝试使用以下代码来做同样的事情,
df_final.toJSON().coalesce(1).saveAsTextFile('file')
文件包含,
{"attribute":"profile","operation":"UPDATE","params":"{\"member_uuid\":\"027130fe-584d-4d8e-9fb0-b87c984a0c20\",\"member_id\":\"ajuypjtnlzmk4na047cgav27jma6_STG\",\"easy_id\":993269700,\"field\":\"password_hash\"}","timestamp":"2020-02-11T19:15:32.000Z"}
我希望它以这种格式保存,
{"attribute":"profile","operation":"UPDATE","params":{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":993269700,"field":"password_hash"},"timestamp":"2020-02-11T19:15:32.000Z"}
to_json 将参数列中的值保存为字符串,有没有办法在此处保留 json 上下文以便我可以将其保存为所需的输出?
一个简单的处理方法是对文件进行替换操作
sourceData=open('file').read().replace('"{','{').replace('}"','}').replace('\','')
with open('file','w') as final:
final.write(sourceData)
这可能不是您想要的,但会达到最终结果。
不要使用 to_json
在数据框中创建 params
列。
这里的技巧就是创建 struct 并写入文件(使用 .saveAsTextFile
(或).write.json()
)Spark 将为 Struct 字段创建 JSON。
如果我们已经创建了 json 对象并以 json 格式写入 Spark 将添加 \
到 escape
quotes
已经存在于 Json 字符串中。
Example:
from pyspark.sql.functions import *
#sample data
df=spark.createDataFrame([("027130fe-584d-4d8e-9fb0-b87c984a0c20","2020-02-11 19:15:32","password_hash","ajuypjtnlzmk4na047cgav27jma6_STG","993269700")],["member_uuid","Timestamp","updated","member_id","easy_id"])
df1=df.withColumn("attribute",lit("profile")).withColumn("operation",lit("UPDATE"))
df1.selectExpr("struct(member_uuid,member_id,easy_id) as params","attribute","operation","timestamp").write.format("json").mode("overwrite").save("<path>")
#{"params":{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":"993269700"},"attribute":"profile","operation":"UPDATE","timestamp":"2020-02-11 19:15:32"}
df1.selectExpr("struct(member_uuid,member_id,easy_id) as params","attribute","operation","timestamp").toJSON().saveAsTextFile("<path>")
#{"params":{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":"993269700"},"attribute":"profile","operation":"UPDATE","timestamp":"2020-02-11 19:15:32"}
我有一个 pyspark 数据框,这是它的样子
+------------------------------------+-------------------+-------------+--------------------------------+---------+
|member_uuid |Timestamp |updated |member_id |easy_id |
+------------------------------------+-------------------+-------------+--------------------------------+---------+
|027130fe-584d-4d8e-9fb0-b87c984a0c20|2020-02-11 19:15:32|password_hash|ajuypjtnlzmk4na047cgav27jma6_STG|993269700|
我把上面的dataframe改成了这个,
+---------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|attribute|operation|params |timestamp |
+---------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|profile |UPDATE |{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":993269700,"field":"password_hash"}|2020-02-11 19:15:32|
使用下面的代码,
ll = ['member_uuid', 'member_id', 'easy_id', 'field']
df = df.withColumn('timestamp', col('Timestamp')).withColumn('attribute', lit('profile')).withColumn('operation', lit(col_name)) \
.withColumn('field', col('updated')).withColumn('params', F.to_json(struct([x for x in ll])))
df = df.select('attribute', 'operation', 'params', 'timestamp')
我已将此数据帧 df 转换为 JSON 后保存到文本文件中。 我尝试使用以下代码来做同样的事情,
df_final.toJSON().coalesce(1).saveAsTextFile('file')
文件包含,
{"attribute":"profile","operation":"UPDATE","params":"{\"member_uuid\":\"027130fe-584d-4d8e-9fb0-b87c984a0c20\",\"member_id\":\"ajuypjtnlzmk4na047cgav27jma6_STG\",\"easy_id\":993269700,\"field\":\"password_hash\"}","timestamp":"2020-02-11T19:15:32.000Z"}
我希望它以这种格式保存,
{"attribute":"profile","operation":"UPDATE","params":{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":993269700,"field":"password_hash"},"timestamp":"2020-02-11T19:15:32.000Z"}
to_json 将参数列中的值保存为字符串,有没有办法在此处保留 json 上下文以便我可以将其保存为所需的输出?
一个简单的处理方法是对文件进行替换操作
sourceData=open('file').read().replace('"{','{').replace('}"','}').replace('\','')
with open('file','w') as final:
final.write(sourceData)
这可能不是您想要的,但会达到最终结果。
不要使用 to_json
在数据框中创建 params
列。
这里的技巧就是创建 struct 并写入文件(使用
.saveAsTextFile
(或).write.json()
)Spark 将为 Struct 字段创建 JSON。如果我们已经创建了 json 对象并以 json 格式写入 Spark 将添加
\
到escape
quotes
已经存在于 Json 字符串中。
Example:
from pyspark.sql.functions import *
#sample data
df=spark.createDataFrame([("027130fe-584d-4d8e-9fb0-b87c984a0c20","2020-02-11 19:15:32","password_hash","ajuypjtnlzmk4na047cgav27jma6_STG","993269700")],["member_uuid","Timestamp","updated","member_id","easy_id"])
df1=df.withColumn("attribute",lit("profile")).withColumn("operation",lit("UPDATE"))
df1.selectExpr("struct(member_uuid,member_id,easy_id) as params","attribute","operation","timestamp").write.format("json").mode("overwrite").save("<path>")
#{"params":{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":"993269700"},"attribute":"profile","operation":"UPDATE","timestamp":"2020-02-11 19:15:32"}
df1.selectExpr("struct(member_uuid,member_id,easy_id) as params","attribute","operation","timestamp").toJSON().saveAsTextFile("<path>")
#{"params":{"member_uuid":"027130fe-584d-4d8e-9fb0-b87c984a0c20","member_id":"ajuypjtnlzmk4na047cgav27jma6_STG","easy_id":"993269700"},"attribute":"profile","operation":"UPDATE","timestamp":"2020-02-11 19:15:32"}