使用 pyspark 将 spark 数据帧转换为嵌套 JSON
Convert spark dataframe to nested JSON using pyspark
我正在尝试将 spark 数据帧转换为 JSON。这个数据框中大约有 100 万行,下面是示例代码,但性能确实很差。所需的输出将是一个 member_id
在 JSON 文件中显示一次,与 tag_name
在一个 member_id
下显示相同。如果有任何可能的方法可以更快地完成此操作,请告诉我。
示例代码:
iresult = sdf.groupBy('member_id','tag_name').agg(ch.collect_list(ch.struct('detail_name','detail_value')).alias('detail')).\
groupBy('member_id').agg(ch.collect_list(ch.struct('tag_name','detail')).alias('tag'))\
.agg(ch.to_json(ch.collect_list(ch.struct('member_id','tag'))).alias('result'))
result.show()
detail.csv:
member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, Service_A, 20
abc123, m1, Service_B, 20
abc123, m2, Service_C, 10
xyz456, m3, Service A, 5
xyz456, m3, Service A, 10
期望输出JSON:
{ "member_id": "abc123",
"tag":[ {"tag_name": "m1",
"detail":[{ "detail_name": "Service_A",
"detail_value": "20"},
{ "detail_name": "Service_B",
"detail_value": "20"}]},
{"tag_name": "m2",
"detail":[{ "detail_name": "Service_C",
"detail_value": "10"}]}]},
{ "member_id": "xyz456",
"tag":[{"tag_name": "m3",
"detail":[{ "detail_name": "Service_A",
"detail_value": "5"},
{ "detail_name": "Service_A",
"detail_value": "10"}]}]}
duplicate.csv:
member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, problem_no, 'abc123xyz'
abc123, m1, problem_no, 'abc456zzz'
xyz456, m1, problem_no, 'abc123xyz'
xyz456, m1, problem_no, 'abc456zzz'
重复输出 JSON:
{ "member_id": "abc123",
"tag":[ {"tag_name": "m1",
"detail":[{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"},
{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"}]}]},
{ "member_id": "xyz456",
"tag":[ {"tag_name": "m1",
"detail":[{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"},
{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"}]}]}
您介意通过 sql 语句实现它吗?
逐层构造struct
,最后使用to_json
函数生成json字符串。
df.createOrReplaceTempView('tmp')
sql = """
select to_json(collect_list(struct(member_id,tag))) as member
from
(select member_id,collect_list(struct(tag_name,detail)) as tag
from
(select member_id,tag_name,collect_list(struct(detail_name,detail_value)) as detail
from tmp
group by member_id,tag_name)
group by member_id)
"""
df = spark.sql(sql)
df.show(truncate=False)
我正在尝试将 spark 数据帧转换为 JSON。这个数据框中大约有 100 万行,下面是示例代码,但性能确实很差。所需的输出将是一个 member_id
在 JSON 文件中显示一次,与 tag_name
在一个 member_id
下显示相同。如果有任何可能的方法可以更快地完成此操作,请告诉我。
示例代码:
iresult = sdf.groupBy('member_id','tag_name').agg(ch.collect_list(ch.struct('detail_name','detail_value')).alias('detail')).\
groupBy('member_id').agg(ch.collect_list(ch.struct('tag_name','detail')).alias('tag'))\
.agg(ch.to_json(ch.collect_list(ch.struct('member_id','tag'))).alias('result'))
result.show()
detail.csv:
member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, Service_A, 20
abc123, m1, Service_B, 20
abc123, m2, Service_C, 10
xyz456, m3, Service A, 5
xyz456, m3, Service A, 10
期望输出JSON:
{ "member_id": "abc123",
"tag":[ {"tag_name": "m1",
"detail":[{ "detail_name": "Service_A",
"detail_value": "20"},
{ "detail_name": "Service_B",
"detail_value": "20"}]},
{"tag_name": "m2",
"detail":[{ "detail_name": "Service_C",
"detail_value": "10"}]}]},
{ "member_id": "xyz456",
"tag":[{"tag_name": "m3",
"detail":[{ "detail_name": "Service_A",
"detail_value": "5"},
{ "detail_name": "Service_A",
"detail_value": "10"}]}]}
duplicate.csv:
member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, problem_no, 'abc123xyz'
abc123, m1, problem_no, 'abc456zzz'
xyz456, m1, problem_no, 'abc123xyz'
xyz456, m1, problem_no, 'abc456zzz'
重复输出 JSON:
{ "member_id": "abc123",
"tag":[ {"tag_name": "m1",
"detail":[{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"},
{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"}]}]},
{ "member_id": "xyz456",
"tag":[ {"tag_name": "m1",
"detail":[{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"},
{ "detail_name": "problem_no",
"detail_value": "abc123xyz"},
{ "detail_name": "problem_no",
"detail_value": "abc456zzz"}]}]}
您介意通过 sql 语句实现它吗?
逐层构造struct
,最后使用to_json
函数生成json字符串。
df.createOrReplaceTempView('tmp')
sql = """
select to_json(collect_list(struct(member_id,tag))) as member
from
(select member_id,collect_list(struct(tag_name,detail)) as tag
from
(select member_id,tag_name,collect_list(struct(detail_name,detail_value)) as detail
from tmp
group by member_id,tag_name)
group by member_id)
"""
df = spark.sql(sql)
df.show(truncate=False)