在 groupBy(x).agg 中过滤以在 Pyspark Dataframe 的不同子集上创建平均值

Filtering within groupBy(x).agg to create averages on different subsets of the Pyspark Dataframe

通常当我必须进行聚合时,我会在 PySpark 中使用类似于以下代码的代码:

import pyspark.sql.functions as f

df_aggregate = df.groupBy('id')\
                 .agg(f.mean('value_col').alias('value_col_mean'))

现在我实际上想计算数据帧 df 的多个子集的平均值或平均值(即在不同时间 windows,例如去年的平均值,今年的平均值最近 2 年等)。我知道我可以为每个子集做 df.filter(f.col(filter_col) >= condition).groupBy....,但我更愿意在一个 'go'.

中做这件事

是否可以在 PySpark 的 .agg(..) 部分应用过滤?

编辑

一个 id 的示例数据如下(实际数据包含多个 id 值):

您可以将条件放在 when 语句中,并将它们全部放在 .agg:

import pyspark.sql.functions as f

df_aggregate = df.withColumn('value_col', f.regexp_replace('value_col', ',', '.'))\
                 .groupBy('id')\
                 .agg(f.mean(f.when(last_year_condition, f.col('value_col'))).alias('value_col_mean_last_year'),
                      f.mean(f.when(last_two_years_condition, f.col('value_col'))).alias('value_col_mean_last_two_years')
                     )