Python/PySpark 中的 Groupby 循环出现问题
Trouble with Groupby loop in Python/PySpark
我正在将一些 SAS 代码转换为 PySpark 并边学边做。我正在尝试转换以下代码,以便创建两个表(NEED_VAR1 和 NEED_VAR2)。
%MACRO split(VAR=);
PROC SUMMARY DATA=HAVE NWAY MISSING;
CLASS &VAR. &VAR._DESC;
VAR col1 col2 col3;
OUTPUT OUT=NEED_&VAR.(DROP=_FREQ_ _TYPE_) SUM=;
RUN;
%MEND;
%SPLIT(VAR=VAR1);
%SPLIT(VAR=VAR2);
到目前为止,我所拥有的...没有解决问题。我想将 'VAR1' 提供给每个生成 NEED_VAR1 的“{}”...然后对 VAR2 也一样。两个 groupby 变量对应该是“VAR1/VAR2 和 VAR1_DESC/VAR2_DESC。有什么建议吗?
import pyspark.sql.functions as F
VAR = [['VAR1','VAR2'],['VAR1','VAR2'],['VAR1','VAR2']]
for start,continue,end in VAR:
NEED_'{}' = HAVE.groupBy('{}',"'{}'_DESC") \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
.format(start,continue,end)
NEED_VAR1.show()
NEED_VAR2.show()
*** 更新-一些示例代码:
import pandas as pd
df = {'VAR1':['14200', '38110', '02120', '15831'],
'VAR1_DESC':['Drug1', 'Drug2', 'Drug3', 'Drug4'],
'VAR2':['200', '110', '120', '831'],
'VAR2_DESC':['Drug1_2', 'Drug2_2', 'Drug3_2', 'Drug4_2'],
'col1':[297.62, 340.67, 12.45, 1209.87],
'col2':[200.32, 210.37, 19.39, 1800.85],
'col3':[1294.65, 322.90, 193.45, 14.59]
}
HAVE = pd.DataFrame(df)
print(HAVE)
使用字符串连接创建变量在语法上是不正确的,也是不允许的
如果您只有 2 个小组要处理,您可以直接创建个人 Dataframes
,而无需使用循环。如果存在多个其他组合,请在评论中更新它,我会相应地更新答案
NEED_VAR1 = HAVE.groupBy(["VAR1","VAR2"]) \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
)
NEED_VAR2 = HAVE.groupBy(["VAR1_DESC","VAR2_DESC"]) \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
)
动态方法
您可以采取的方法是创建一个 split_map
,其中将包含所有必需的 groups/level 聚合
此外,对于每个组,结果需要存储在容器 [list]
中,可以单独使用,也可以通过 [=19= 合并在一起形成一个 DataFrame
]
注意 - Union 有一些注意事项需要注意,我在下面的代码片段中已经注意了这些注意事项
数据准备
+-----+---------+----+---------+-------+-------+-------+
| VAR1|VAR1_DESC|VAR2|VAR2_DESC| col1| col2| col3|
+-----+---------+----+---------+-------+-------+-------+
|14200| Drug1| 200| Drug1_2| 297.62| 200.32|1294.65|
|38110| Drug2| 110| Drug2_2| 340.67| 210.37| 322.9|
|02120| Drug3| 120| Drug3_2| 12.45| 19.39| 193.45|
|15831| Drug4| 831| Drug4_2|1209.87|1800.85| 14.59|
+-----+---------+----+---------+-------+-------+-------+
拆分地图和数据联盟
def concat_spark_dataframe(in_lst):
return reduce(lambda x,y:x.unionAll(y),res)
split_map = {
'group_1' :['VAR1','VAR2']
,'group_2' :['VAR1_DESC','VAR2_DESC']
}
res = []
agg_cols = ['col1','col2','col3']
agg_func = [ F.sum(F.col(c)).alias(c) for c in agg_cols ]
for key in split_map:
imm_column_map = ['Group Key - 1','Group Key - 2']
group_col_lst = split_map[key]
final_col_lst = group_col_lst + agg_cols
immDF = sparkDF.select(*[final_col_lst])
immDF = reduce(lambda x, idx: x.withColumnRenamed( group_col_lst[idx]
,imm_column_map[idx]
)
, range( len(imm_column_map) )
, immDF
)
res += [
immDF.groupBy(imm_column_map)\
.agg(*agg_func)\
.withColumn('group-key',F.lit(key))
.withColumn('group-value',F.lit('-'.join(group_col_lst)))
]
finalDF = concat_spark_dataframe(res)
finalDF.show()
+-------------+-------------+-------+-------+-------+---------+-------------------+
|Group Key - 1|Group Key - 2| col1| col2| col3|group-key| group-value|
+-------------+-------------+-------+-------+-------+---------+-------------------+
| 14200| 200| 297.62| 200.32|1294.65| group_1| VAR1-VAR2|
| 15831| 831|1209.87|1800.85| 14.59| group_1| VAR1-VAR2|
| 02120| 120| 12.45| 19.39| 193.45| group_1| VAR1-VAR2|
| 38110| 110| 340.67| 210.37| 322.9| group_1| VAR1-VAR2|
| Drug2| Drug2_2| 340.67| 210.37| 322.9| group_2|VAR1_DESC-VAR2_DESC|
| Drug1| Drug1_2| 297.62| 200.32|1294.65| group_2|VAR1_DESC-VAR2_DESC|
| Drug3| Drug3_2| 12.45| 19.39| 193.45| group_2|VAR1_DESC-VAR2_DESC|
| Drug4| Drug4_2|1209.87|1800.85| 14.59| group_2|VAR1_DESC-VAR2_DESC|
+-------------+-------------+-------+-------+-------+---------+-------------------+
个人拆分
NEW_VAR1 = finalDF.filter(F.col('group-key') == 'group_1')
NEW_VAR2 = finalDF.filter(F.col('group-key') == 'group_2')
解决方案本质上是动态的,可以容纳 N
组和拆分数
我正在将一些 SAS 代码转换为 PySpark 并边学边做。我正在尝试转换以下代码,以便创建两个表(NEED_VAR1 和 NEED_VAR2)。
%MACRO split(VAR=);
PROC SUMMARY DATA=HAVE NWAY MISSING;
CLASS &VAR. &VAR._DESC;
VAR col1 col2 col3;
OUTPUT OUT=NEED_&VAR.(DROP=_FREQ_ _TYPE_) SUM=;
RUN;
%MEND;
%SPLIT(VAR=VAR1);
%SPLIT(VAR=VAR2);
到目前为止,我所拥有的...没有解决问题。我想将 'VAR1' 提供给每个生成 NEED_VAR1 的“{}”...然后对 VAR2 也一样。两个 groupby 变量对应该是“VAR1/VAR2 和 VAR1_DESC/VAR2_DESC。有什么建议吗?
import pyspark.sql.functions as F
VAR = [['VAR1','VAR2'],['VAR1','VAR2'],['VAR1','VAR2']]
for start,continue,end in VAR:
NEED_'{}' = HAVE.groupBy('{}',"'{}'_DESC") \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
.format(start,continue,end)
NEED_VAR1.show()
NEED_VAR2.show()
*** 更新-一些示例代码:
import pandas as pd
df = {'VAR1':['14200', '38110', '02120', '15831'],
'VAR1_DESC':['Drug1', 'Drug2', 'Drug3', 'Drug4'],
'VAR2':['200', '110', '120', '831'],
'VAR2_DESC':['Drug1_2', 'Drug2_2', 'Drug3_2', 'Drug4_2'],
'col1':[297.62, 340.67, 12.45, 1209.87],
'col2':[200.32, 210.37, 19.39, 1800.85],
'col3':[1294.65, 322.90, 193.45, 14.59]
}
HAVE = pd.DataFrame(df)
print(HAVE)
使用字符串连接创建变量在语法上是不正确的,也是不允许的
如果您只有 2 个小组要处理,您可以直接创建个人 Dataframes
,而无需使用循环。如果存在多个其他组合,请在评论中更新它,我会相应地更新答案
NEED_VAR1 = HAVE.groupBy(["VAR1","VAR2"]) \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
)
NEED_VAR2 = HAVE.groupBy(["VAR1_DESC","VAR2_DESC"]) \
.agg(F.sum('col1').alias('col1'), \
F.sum('col2').alias('col2'), \
F.sum('col3').alias('col3'), \
)
动态方法
您可以采取的方法是创建一个 split_map
,其中将包含所有必需的 groups/level 聚合
此外,对于每个组,结果需要存储在容器 [list]
中,可以单独使用,也可以通过 [=19= 合并在一起形成一个 DataFrame
]
注意 - Union 有一些注意事项需要注意,我在下面的代码片段中已经注意了这些注意事项
数据准备
+-----+---------+----+---------+-------+-------+-------+
| VAR1|VAR1_DESC|VAR2|VAR2_DESC| col1| col2| col3|
+-----+---------+----+---------+-------+-------+-------+
|14200| Drug1| 200| Drug1_2| 297.62| 200.32|1294.65|
|38110| Drug2| 110| Drug2_2| 340.67| 210.37| 322.9|
|02120| Drug3| 120| Drug3_2| 12.45| 19.39| 193.45|
|15831| Drug4| 831| Drug4_2|1209.87|1800.85| 14.59|
+-----+---------+----+---------+-------+-------+-------+
拆分地图和数据联盟
def concat_spark_dataframe(in_lst):
return reduce(lambda x,y:x.unionAll(y),res)
split_map = {
'group_1' :['VAR1','VAR2']
,'group_2' :['VAR1_DESC','VAR2_DESC']
}
res = []
agg_cols = ['col1','col2','col3']
agg_func = [ F.sum(F.col(c)).alias(c) for c in agg_cols ]
for key in split_map:
imm_column_map = ['Group Key - 1','Group Key - 2']
group_col_lst = split_map[key]
final_col_lst = group_col_lst + agg_cols
immDF = sparkDF.select(*[final_col_lst])
immDF = reduce(lambda x, idx: x.withColumnRenamed( group_col_lst[idx]
,imm_column_map[idx]
)
, range( len(imm_column_map) )
, immDF
)
res += [
immDF.groupBy(imm_column_map)\
.agg(*agg_func)\
.withColumn('group-key',F.lit(key))
.withColumn('group-value',F.lit('-'.join(group_col_lst)))
]
finalDF = concat_spark_dataframe(res)
finalDF.show()
+-------------+-------------+-------+-------+-------+---------+-------------------+
|Group Key - 1|Group Key - 2| col1| col2| col3|group-key| group-value|
+-------------+-------------+-------+-------+-------+---------+-------------------+
| 14200| 200| 297.62| 200.32|1294.65| group_1| VAR1-VAR2|
| 15831| 831|1209.87|1800.85| 14.59| group_1| VAR1-VAR2|
| 02120| 120| 12.45| 19.39| 193.45| group_1| VAR1-VAR2|
| 38110| 110| 340.67| 210.37| 322.9| group_1| VAR1-VAR2|
| Drug2| Drug2_2| 340.67| 210.37| 322.9| group_2|VAR1_DESC-VAR2_DESC|
| Drug1| Drug1_2| 297.62| 200.32|1294.65| group_2|VAR1_DESC-VAR2_DESC|
| Drug3| Drug3_2| 12.45| 19.39| 193.45| group_2|VAR1_DESC-VAR2_DESC|
| Drug4| Drug4_2|1209.87|1800.85| 14.59| group_2|VAR1_DESC-VAR2_DESC|
+-------------+-------------+-------+-------+-------+---------+-------------------+
个人拆分
NEW_VAR1 = finalDF.filter(F.col('group-key') == 'group_1')
NEW_VAR2 = finalDF.filter(F.col('group-key') == 'group_2')
解决方案本质上是动态的,可以容纳 N
组和拆分数