python 和 pyspark 中多个时间段的分组依据和聚合值
Group by and aggregate values from multiple time periods in python and pyspark
我有如下数据框。我们每个月都有数据。
df = pd.DataFrame({'id': ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'B'],
'month': ['2020-01', '2020-02', '2020-03', '2020-04',
'2020-01','2020-02','2020-03','2020-04'],
'amt': [2, 3, 4, 5, 2, 3, 1, 5]})
id month amt
0 A 2020-01 2
1 A 2020-02 3
2 A 2020-03 4
3 A 2020-04 5
4 B 2020-01 2
5 B 2020-02 3
6 B 2020-03 1
7 B 2020-04 5
我需要汇总多个月的值。下面是想要的结果。
id month_start month_end amt
0 A 2020-01 2020-03 9
1 A 2020-02 2020-04 12
2 B 2020-01 2020-03 6
3 B 2020-02 2020-04 9
我正在寻找通用解决方案。实际情况要复杂得多。例如,开始和结束之间可能有 n
个月。如果有人可以在 python 和 pyspark 中找到解决方案,我们将不胜感激。谢谢。
groupby_rolling
在这里工作得很好。 groupby
"id" 并创建滚动 n
window 对象。然后找到“amt”的总和,找到滚动的第一个月和最后一个月window。请注意,由于 rolling
不接受 non-numeric 值,我们需要使用 df.index
为每个滚动 window.
n = 3
# index will be used to get the start and end months in rolling
df = df.reset_index()
r_obj = df.groupby('id').rolling(n)
out = r_obj['amt'].sum().dropna().droplevel(1).reset_index()
month_idx = r_obj['index'].agg({'start_month_idx': lambda x: x.iat[0], 'end_month_idx': lambda x: x.iat[-1]}).dropna().reset_index(drop=True)
out['start_month'] = df.loc[month_idx['start_month_idx'], 'month'].reset_index(drop=True)
out['end_month'] = df.loc[month_idx['end_month_idx'], 'month'].reset_index(drop=True)
out = out[['id', 'start_month', 'end_month', 'amt']]
输出:
id start_month end_month amt
0 A 2020-01 2020-03 9.0
1 A 2020-02 2020-04 12.0
2 B 2020-01 2020-03 6.0
3 B 2020-02 2020-04 9.0
在 pyspark 中,您可以在 Window 上使用 collect_list
,框架边界指定为 [-n, currentRow]
之间的行,
获得 n
个连续月份,并计算 amt
的 运行 总和 Window。最后,仅过滤月数等于 n + 1
:
的行
from pyspark.sql import functions as F, Window
# create spark df from pandas dataframe
sdf = spark.createDataFrame(df)
n = 2
w = Window.partitionBy("id").orderBy("month").rowsBetween(-n, Window.currentRow)
result = sdf.withColumn("months", F.collect_list("month").over(w)) \
.withColumn("amt", F.sum("amt").over(w)) \
.filter(F.size("months") == n + 1) \
.select(
F.col("id"),
F.element_at(F.col("months"), 1).alias("month_start"),
F.element_at(F.col("months"), -1).alias("month_end"),
F.col("amt")
)
result.show()
#+---+-----------+---------+---+
#| id|month_start|month_end|amt|
#+---+-----------+---------+---+
#| A| 2020-01| 2020-03| 9|
#| A| 2020-02| 2020-04| 12|
#| B| 2020-01| 2020-03| 6|
#| B| 2020-02| 2020-04| 9|
#+---+-----------+---------+---+
我有如下数据框。我们每个月都有数据。
df = pd.DataFrame({'id': ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'B'],
'month': ['2020-01', '2020-02', '2020-03', '2020-04',
'2020-01','2020-02','2020-03','2020-04'],
'amt': [2, 3, 4, 5, 2, 3, 1, 5]})
id month amt
0 A 2020-01 2
1 A 2020-02 3
2 A 2020-03 4
3 A 2020-04 5
4 B 2020-01 2
5 B 2020-02 3
6 B 2020-03 1
7 B 2020-04 5
我需要汇总多个月的值。下面是想要的结果。
id month_start month_end amt
0 A 2020-01 2020-03 9
1 A 2020-02 2020-04 12
2 B 2020-01 2020-03 6
3 B 2020-02 2020-04 9
我正在寻找通用解决方案。实际情况要复杂得多。例如,开始和结束之间可能有 n
个月。如果有人可以在 python 和 pyspark 中找到解决方案,我们将不胜感激。谢谢。
groupby_rolling
在这里工作得很好。 groupby
"id" 并创建滚动 n
window 对象。然后找到“amt”的总和,找到滚动的第一个月和最后一个月window。请注意,由于 rolling
不接受 non-numeric 值,我们需要使用 df.index
为每个滚动 window.
n = 3
# index will be used to get the start and end months in rolling
df = df.reset_index()
r_obj = df.groupby('id').rolling(n)
out = r_obj['amt'].sum().dropna().droplevel(1).reset_index()
month_idx = r_obj['index'].agg({'start_month_idx': lambda x: x.iat[0], 'end_month_idx': lambda x: x.iat[-1]}).dropna().reset_index(drop=True)
out['start_month'] = df.loc[month_idx['start_month_idx'], 'month'].reset_index(drop=True)
out['end_month'] = df.loc[month_idx['end_month_idx'], 'month'].reset_index(drop=True)
out = out[['id', 'start_month', 'end_month', 'amt']]
输出:
id start_month end_month amt
0 A 2020-01 2020-03 9.0
1 A 2020-02 2020-04 12.0
2 B 2020-01 2020-03 6.0
3 B 2020-02 2020-04 9.0
在 pyspark 中,您可以在 Window 上使用 collect_list
,框架边界指定为 [-n, currentRow]
之间的行,
获得 n
个连续月份,并计算 amt
的 运行 总和 Window。最后,仅过滤月数等于 n + 1
:
from pyspark.sql import functions as F, Window
# create spark df from pandas dataframe
sdf = spark.createDataFrame(df)
n = 2
w = Window.partitionBy("id").orderBy("month").rowsBetween(-n, Window.currentRow)
result = sdf.withColumn("months", F.collect_list("month").over(w)) \
.withColumn("amt", F.sum("amt").over(w)) \
.filter(F.size("months") == n + 1) \
.select(
F.col("id"),
F.element_at(F.col("months"), 1).alias("month_start"),
F.element_at(F.col("months"), -1).alias("month_end"),
F.col("amt")
)
result.show()
#+---+-----------+---------+---+
#| id|month_start|month_end|amt|
#+---+-----------+---------+---+
#| A| 2020-01| 2020-03| 9|
#| A| 2020-02| 2020-04| 12|
#| B| 2020-01| 2020-03| 6|
#| B| 2020-02| 2020-04| 9|
#+---+-----------+---------+---+