python 和 pyspark 中多个时间段的分组依据和聚合值

Group by and aggregate values from multiple time periods in python and pyspark

我有如下数据框。我们每个月都有数据。

df = pd.DataFrame({'id': ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'B'], 
              'month': ['2020-01', '2020-02', '2020-03', '2020-04',
                        '2020-01','2020-02','2020-03','2020-04'],
              'amt': [2, 3, 4, 5, 2, 3, 1, 5]})
    id  month   amt
0   A   2020-01 2
1   A   2020-02 3
2   A   2020-03 4
3   A   2020-04 5
4   B   2020-01 2
5   B   2020-02 3
6   B   2020-03 1
7   B   2020-04 5

我需要汇总多个月的值。下面是想要的结果。

    id  month_start month_end   amt
0   A   2020-01     2020-03     9
1   A   2020-02     2020-04     12
2   B   2020-01     2020-03     6
3   B   2020-02     2020-04     9

我正在寻找通用解决方案。实际情况要复杂得多。例如,开始和结束之间可能有 n 个月。如果有人可以在 python 和 pyspark 中找到解决方案,我们将不胜感激。谢谢。

groupby_rolling 在这里工作得很好。 groupby "id" 并创建滚动 n window 对象。然后找到“amt”的总和,找到滚动的第一个月和最后一个月window。请注意,由于 rolling 不接受 non-numeric 值,我们需要使用 df.index 为每个滚动 window.

n = 3
# index will be used to get the start and end months in rolling
df = df.reset_index()
r_obj = df.groupby('id').rolling(n)
out = r_obj['amt'].sum().dropna().droplevel(1).reset_index()
month_idx = r_obj['index'].agg({'start_month_idx': lambda x: x.iat[0], 'end_month_idx': lambda x: x.iat[-1]}).dropna().reset_index(drop=True)
out['start_month'] = df.loc[month_idx['start_month_idx'], 'month'].reset_index(drop=True)
out['end_month'] = df.loc[month_idx['end_month_idx'], 'month'].reset_index(drop=True)
out = out[['id', 'start_month', 'end_month', 'amt']]

输出:

  id start_month end_month   amt
0  A     2020-01   2020-03   9.0
1  A     2020-02   2020-04  12.0
2  B     2020-01   2020-03   6.0
3  B     2020-02   2020-04   9.0

在 pyspark 中,您可以在 Window 上使用 collect_list,框架边界指定为 [-n, currentRow] 之间的行, 获得 n 个连续月份,并计算 amt 的 运行 总和 Window。最后,仅过滤月数等于 n + 1:

的行
from pyspark.sql import functions as F, Window

# create spark df from pandas dataframe
sdf = spark.createDataFrame(df)

n = 2
w = Window.partitionBy("id").orderBy("month").rowsBetween(-n, Window.currentRow)

result = sdf.withColumn("months", F.collect_list("month").over(w)) \
    .withColumn("amt", F.sum("amt").over(w)) \
    .filter(F.size("months") == n + 1) \
    .select(
        F.col("id"),
        F.element_at(F.col("months"), 1).alias("month_start"),
        F.element_at(F.col("months"), -1).alias("month_end"),
        F.col("amt")
    )

result.show()
#+---+-----------+---------+---+
#| id|month_start|month_end|amt|
#+---+-----------+---------+---+
#|  A|    2020-01|  2020-03|  9|
#|  A|    2020-02|  2020-04| 12|
#|  B|    2020-01|  2020-03|  6|
#|  B|    2020-02|  2020-04|  9|
#+---+-----------+---------+---+