Python/PySpark 中的 Groupby 循环出现问题

Trouble with Groupby loop in Python/PySpark

我正在将一些 SAS 代码转换为 PySpark 并边学边做。我正在尝试转换以下代码,以便创建两个表(NEED_VAR1 和 NEED_VAR2)。

%MACRO split(VAR=);
PROC SUMMARY DATA=HAVE NWAY MISSING;
CLASS  &VAR. &VAR._DESC;
VAR col1 col2 col3;
OUTPUT OUT=NEED_&VAR.(DROP=_FREQ_ _TYPE_) SUM=;
RUN;
%MEND;
%SPLIT(VAR=VAR1);
%SPLIT(VAR=VAR2);

到目前为止,我所拥有的...没有解决问题。我想将 'VAR1' 提供给每个生成 NEED_VAR1 的“{}”...然后对 VAR2 也一样。两个 groupby 变量对应该是“VAR1/VAR2 和 VAR1_DESC/VAR2_DESC。有什么建议吗?

import pyspark.sql.functions as F

VAR = [['VAR1','VAR2'],['VAR1','VAR2'],['VAR1','VAR2']]

for start,continue,end in VAR:
  NEED_'{}' = HAVE.groupBy('{}',"'{}'_DESC") \
                  .agg(F.sum('col1').alias('col1'), \
                       F.sum('col2').alias('col2'), \
                       F.sum('col3').alias('col3'), \
                  .format(start,continue,end)
NEED_VAR1.show()
NEED_VAR2.show()

*** 更新-一些示例代码:

import pandas as pd
 
df = {'VAR1':['14200', '38110', '02120', '15831'],
        'VAR1_DESC':['Drug1', 'Drug2', 'Drug3', 'Drug4'],
        'VAR2':['200', '110', '120', '831'],
        'VAR2_DESC':['Drug1_2', 'Drug2_2', 'Drug3_2', 'Drug4_2'],
        'col1':[297.62, 340.67, 12.45, 1209.87],
        'col2':[200.32, 210.37, 19.39, 1800.85],
        'col3':[1294.65, 322.90, 193.45, 14.59]
        }
 
HAVE = pd.DataFrame(df)
print(HAVE)

使用字符串连接创建变量在语法上是不正确的,也是不允许的

如果您只有 2 个小组要处理,您可以直接创建个人 Dataframes,而无需使用循环。如果存在多个其他组合,请在评论中更新它,我会相应地更新答案

NEED_VAR1 = HAVE.groupBy(["VAR1","VAR2"]) \
                        .agg(F.sum('col1').alias('col1'), \
                            F.sum('col2').alias('col2'), \
                            F.sum('col3').alias('col3'), \
                       )

NEED_VAR2 = HAVE.groupBy(["VAR1_DESC","VAR2_DESC"]) \
                        .agg(F.sum('col1').alias('col1'), \
                            F.sum('col2').alias('col2'), \
                            F.sum('col3').alias('col3'), \
                       )

动态方法

您可以采取的方法是创建一个 split_map,其中将包含所有必需的 groups/level 聚合

此外,对于每个组,结果需要存储在容器 [list] 中,可以单独使用,也可以通过 [=19= 合并在一起形成一个 DataFrame ]

注意 - Union 有一些注意事项需要注意,我在下面的代码片段中已经注意了这些注意事项

数据准备

+-----+---------+----+---------+-------+-------+-------+
| VAR1|VAR1_DESC|VAR2|VAR2_DESC|   col1|   col2|   col3|
+-----+---------+----+---------+-------+-------+-------+
|14200|    Drug1| 200|  Drug1_2| 297.62| 200.32|1294.65|
|38110|    Drug2| 110|  Drug2_2| 340.67| 210.37|  322.9|
|02120|    Drug3| 120|  Drug3_2|  12.45|  19.39| 193.45|
|15831|    Drug4| 831|  Drug4_2|1209.87|1800.85|  14.59|
+-----+---------+----+---------+-------+-------+-------+

拆分地图和数据联盟

def concat_spark_dataframe(in_lst):

    return reduce(lambda x,y:x.unionAll(y),res)

split_map = {
    'group_1' :['VAR1','VAR2']
    ,'group_2' :['VAR1_DESC','VAR2_DESC']
}

res = []

agg_cols = ['col1','col2','col3']

agg_func = [ F.sum(F.col(c)).alias(c) for c in agg_cols ]

for key in split_map:
    
    imm_column_map = ['Group Key - 1','Group Key - 2']
    
    group_col_lst = split_map[key]
    
    final_col_lst = group_col_lst + agg_cols
    
    immDF = sparkDF.select(*[final_col_lst])
    
    immDF = reduce(lambda x, idx: x.withColumnRenamed(   group_col_lst[idx]
                                                        ,imm_column_map[idx]
                                                     )
                   , range( len(imm_column_map) )
                   , immDF
                )
    
    res += [ 
                immDF.groupBy(imm_column_map)\
                       .agg(*agg_func)\
                       .withColumn('group-key',F.lit(key))
                       .withColumn('group-value',F.lit('-'.join(group_col_lst)))
           ]


finalDF = concat_spark_dataframe(res)

finalDF.show()

+-------------+-------------+-------+-------+-------+---------+-------------------+
|Group Key - 1|Group Key - 2|   col1|   col2|   col3|group-key|        group-value|
+-------------+-------------+-------+-------+-------+---------+-------------------+
|        14200|          200| 297.62| 200.32|1294.65|  group_1|          VAR1-VAR2|
|        15831|          831|1209.87|1800.85|  14.59|  group_1|          VAR1-VAR2|
|        02120|          120|  12.45|  19.39| 193.45|  group_1|          VAR1-VAR2|
|        38110|          110| 340.67| 210.37|  322.9|  group_1|          VAR1-VAR2|
|        Drug2|      Drug2_2| 340.67| 210.37|  322.9|  group_2|VAR1_DESC-VAR2_DESC|
|        Drug1|      Drug1_2| 297.62| 200.32|1294.65|  group_2|VAR1_DESC-VAR2_DESC|
|        Drug3|      Drug3_2|  12.45|  19.39| 193.45|  group_2|VAR1_DESC-VAR2_DESC|
|        Drug4|      Drug4_2|1209.87|1800.85|  14.59|  group_2|VAR1_DESC-VAR2_DESC|
+-------------+-------------+-------+-------+-------+---------+-------------------+

个人拆分

NEW_VAR1 = finalDF.filter(F.col('group-key') == 'group_1')
NEW_VAR2 = finalDF.filter(F.col('group-key') == 'group_2')

解决方案本质上是动态的,可以容纳 N 组和拆分数