在 groupBy(x).agg 中过滤以在 Pyspark Dataframe 的不同子集上创建平均值
Filtering within groupBy(x).agg to create averages on different subsets of the Pyspark Dataframe
通常当我必须进行聚合时,我会在 PySpark 中使用类似于以下代码的代码:
import pyspark.sql.functions as f
df_aggregate = df.groupBy('id')\
.agg(f.mean('value_col').alias('value_col_mean'))
现在我实际上想计算数据帧 df
的多个子集的平均值或平均值(即在不同时间 windows,例如去年的平均值,今年的平均值最近 2 年等)。我知道我可以为每个子集做 df.filter(f.col(filter_col) >= condition).groupBy....
,但我更愿意在一个 'go'.
中做这件事
是否可以在 PySpark 的 .agg(..)
部分应用过滤?
编辑
一个 id 的示例数据如下(实际数据包含多个 id 值):
您可以将条件放在 when
语句中,并将它们全部放在 .agg
:
中
import pyspark.sql.functions as f
df_aggregate = df.withColumn('value_col', f.regexp_replace('value_col', ',', '.'))\
.groupBy('id')\
.agg(f.mean(f.when(last_year_condition, f.col('value_col'))).alias('value_col_mean_last_year'),
f.mean(f.when(last_two_years_condition, f.col('value_col'))).alias('value_col_mean_last_two_years')
)
通常当我必须进行聚合时,我会在 PySpark 中使用类似于以下代码的代码:
import pyspark.sql.functions as f
df_aggregate = df.groupBy('id')\
.agg(f.mean('value_col').alias('value_col_mean'))
现在我实际上想计算数据帧 df
的多个子集的平均值或平均值(即在不同时间 windows,例如去年的平均值,今年的平均值最近 2 年等)。我知道我可以为每个子集做 df.filter(f.col(filter_col) >= condition).groupBy....
,但我更愿意在一个 'go'.
是否可以在 PySpark 的 .agg(..)
部分应用过滤?
编辑
一个 id 的示例数据如下(实际数据包含多个 id 值):
您可以将条件放在 when
语句中,并将它们全部放在 .agg
:
import pyspark.sql.functions as f
df_aggregate = df.withColumn('value_col', f.regexp_replace('value_col', ',', '.'))\
.groupBy('id')\
.agg(f.mean(f.when(last_year_condition, f.col('value_col'))).alias('value_col_mean_last_year'),
f.mean(f.when(last_two_years_condition, f.col('value_col'))).alias('value_col_mean_last_two_years')
)