通过 GroupBy 使用两个 PySpark 列创建 JSON 字符串
Creating JSON string using two PySpark columns by GroupBy
我有一个 Spark 数据框,如下所示。我想创建一个列 'new_col',它按 'Code' 和 'Department' 以外的所有列分组,并根据 'Code' 和 [=] 列分配一个 JSON 结构27=].
首先需要对数据框进行排序。除 Code 和 Department 列外,第 1-3 行和 4-5 行是重复的。所以我会为第 3 行创建 JSON 作为 new_col 作为 {"Code": "A", "Department": "Department Store"}, { "Code": "B"," Department": "All Other Suppliers"}, {"Code": "C","Department": "Rest"}
我的输入数据框:
预期输出 Spark 数据帧:
应该这样做:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('XYZ', '324 NW', 'VA', 'A', 'Department Store', 'X', 'Y'),
('XYZ', '324 NW', 'VA', 'B', 'All Other Suppliers', 'X', 'Y'),
('XYZ', '324 NW', 'VA', 'C', 'Rest', 'X', 'Y'),
('ABC', '45 N Ave', 'MA', 'C', 'Rest', 'A', 'A'),
('ABC', '45 N Ave', 'MA', 'B', 'All Other Suppliers', 'A', 'A'),
('ZXC', '12 SW Street', 'NY', 'A', 'Department Store', 'B', 'Z')],
['Name', 'Address', 'State', 'Code', 'Department', 'col1', 'col2']
)
cols = [c for c in df.columns if c not in ['Code', 'Department']]
w1 = W.partitionBy(cols).orderBy('Code')
w2 = W.partitionBy(cols).orderBy(F.desc('Code'))
df = (df
.withColumn('_rn', F.row_number().over(w1))
.withColumn('new_col', F.collect_list(F.to_json(F.struct(['Code', 'Department']))).over(w2))
.withColumn("new_col", F.array_join("new_col", ","))
.filter('_rn=1')
.drop('_rn')
)
df.show(truncate=False)
# +----+------------+-----+----+-------------------+----+----+-----------------------------------------------------------------------------------------------------------------------------+
# |Name|Address |State|Code|Department |col1|col2|new_col |
# +----+------------+-----+----+-------------------+----+----+-----------------------------------------------------------------------------------------------------------------------------+
# |ABC |45 N Ave |MA |B |All Other Suppliers|A |A |{"Code":"C","Department":"Rest"},{"Code":"B","Department":"All Other Suppliers"} |
# |XYZ |324 NW |VA |A |Department Store |X |Y |{"Code":"C","Department":"Rest"},{"Code":"B","Department":"All Other Suppliers"},{"Code":"A","Department":"Department Store"}|
# |ZXC |12 SW Street|NY |A |Department Store |B |Z |{"Code":"A","Department":"Department Store"} |
# +----+------------+-----+----+-------------------+----+----+-----------------------------------------------------------------------------------------------------------------------------+
我有一个 Spark 数据框,如下所示。我想创建一个列 'new_col',它按 'Code' 和 'Department' 以外的所有列分组,并根据 'Code' 和 [=] 列分配一个 JSON 结构27=].
首先需要对数据框进行排序。除 Code 和 Department 列外,第 1-3 行和 4-5 行是重复的。所以我会为第 3 行创建 JSON 作为 new_col 作为 {"Code": "A", "Department": "Department Store"}, { "Code": "B"," Department": "All Other Suppliers"}, {"Code": "C","Department": "Rest"}
我的输入数据框:
预期输出 Spark 数据帧:
应该这样做:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('XYZ', '324 NW', 'VA', 'A', 'Department Store', 'X', 'Y'),
('XYZ', '324 NW', 'VA', 'B', 'All Other Suppliers', 'X', 'Y'),
('XYZ', '324 NW', 'VA', 'C', 'Rest', 'X', 'Y'),
('ABC', '45 N Ave', 'MA', 'C', 'Rest', 'A', 'A'),
('ABC', '45 N Ave', 'MA', 'B', 'All Other Suppliers', 'A', 'A'),
('ZXC', '12 SW Street', 'NY', 'A', 'Department Store', 'B', 'Z')],
['Name', 'Address', 'State', 'Code', 'Department', 'col1', 'col2']
)
cols = [c for c in df.columns if c not in ['Code', 'Department']]
w1 = W.partitionBy(cols).orderBy('Code')
w2 = W.partitionBy(cols).orderBy(F.desc('Code'))
df = (df
.withColumn('_rn', F.row_number().over(w1))
.withColumn('new_col', F.collect_list(F.to_json(F.struct(['Code', 'Department']))).over(w2))
.withColumn("new_col", F.array_join("new_col", ","))
.filter('_rn=1')
.drop('_rn')
)
df.show(truncate=False)
# +----+------------+-----+----+-------------------+----+----+-----------------------------------------------------------------------------------------------------------------------------+
# |Name|Address |State|Code|Department |col1|col2|new_col |
# +----+------------+-----+----+-------------------+----+----+-----------------------------------------------------------------------------------------------------------------------------+
# |ABC |45 N Ave |MA |B |All Other Suppliers|A |A |{"Code":"C","Department":"Rest"},{"Code":"B","Department":"All Other Suppliers"} |
# |XYZ |324 NW |VA |A |Department Store |X |Y |{"Code":"C","Department":"Rest"},{"Code":"B","Department":"All Other Suppliers"},{"Code":"A","Department":"Department Store"}|
# |ZXC |12 SW Street|NY |A |Department Store |B |Z |{"Code":"A","Department":"Department Store"} |
# +----+------------+-----+----+-------------------+----+----+-----------------------------------------------------------------------------------------------------------------------------+