使用 Pyspark 每 6 个月为每个客户计算滚动总和费用
Calculate Rolling Sum Expense each 6 months for each customer using Pyspark
我有如下数据集,我想使用 Pyspark 为每位客户每 6 个月计算一次滚动总费用。
我在 pyspark.sql.function 中使用了 window 但没有成功。
我只是想知道是否有人可以帮助我。
谢谢。
总结起来,流程如下:
这是示例数据
d = [
{'id': 1, 'Month': 1, 'expense': 11.0,},
{'id': 1, 'Month': 1, 'expense': 15.0},
{'id': 1, 'Month': 1, 'expense': 16.0},
{'id': 1, 'Month': 2, 'expense': 12.0},
{'id': 1, 'Month': 2, 'expense': 14.0},
{'id': 1, 'Month': 3, 'expense': 6.0},
{'id': 1, 'Month': 3, 'expense': 7.0},
{'id': 1, 'Month': 3, 'expense': 4.0},
{'id': 1, 'Month': 4, 'expense': 4.0},
{'id': 1, 'Month': 5, 'expense': 6.0},
{'id': 1, 'Month': 6, 'expense': 7.0},
{'id': 1, 'Month': 7, 'expense': 8.0},
{'id': 1, 'Month': 8, 'expense': 9.0},
{'id': 2, 'Month': 1, 'expense': 1.0},
{'id': 2, 'Month': 1, 'expense': 5.0},
{'id': 2, 'Month': 1, 'expense': 6.0},
{'id': 2, 'Month': 2, 'expense': 2.0},
{'id': 2, 'Month': 2, 'expense': 4.0},
{'id': 2, 'Month': 3, 'expense': 14.0},
{'id': 2, 'Month': 3, 'expense': 17.0},
{'id': 2, 'Month': 3, 'expense': 16.0},
{'id': 1, 'Month': 4, 'expense': 4.0},
{'id': 1, 'Month': 5, 'expense': 6.0},
{'id': 1, 'Month': 6, 'expense': 7.0},
{'id': 1, 'Month': 7, 'expense': 8.0},
{'id': 1, 'Month': 8, 'expense': 9.0}]
您可以使用rangeBetween
函数来计算滚动总和。
我将我的框架边界定义为 rangeBetween(-1, 0)
。 -1
表示当前行之前的one-off,0
表示当前行。在此帧上聚合会得到 two-month 滚动平均值:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
w = Window().partitionBy("id").orderBy('Month')
df.groupBy("id", "Month").agg(F.sum(col("expense")).alias("total_expense")) \
.withColumn("rank", F.row_number().over(w)) \
.withColumn("two_month_rolling_sum", F.sum(col("total_expense")).over(w.rangeBetween(-1, 0))) \
.filter(col("rank")!=1)\
.drop("total_expense", "rank")\
.orderBy("id", "Month").show()
输出:
+---+-----+---------------------+
| id|Month|two_month_rolling_sum|
+---+-----+---------------------+
| 1| 2| 68.0|
| 1| 3| 43.0|
| 2| 2| 18.0|
| 2| 3| 53.0|
+---+-----+---------------------+
我有如下数据集,我想使用 Pyspark 为每位客户每 6 个月计算一次滚动总费用。 我在 pyspark.sql.function 中使用了 window 但没有成功。 我只是想知道是否有人可以帮助我。
谢谢。
总结起来,流程如下:
d = [
{'id': 1, 'Month': 1, 'expense': 11.0,},
{'id': 1, 'Month': 1, 'expense': 15.0},
{'id': 1, 'Month': 1, 'expense': 16.0},
{'id': 1, 'Month': 2, 'expense': 12.0},
{'id': 1, 'Month': 2, 'expense': 14.0},
{'id': 1, 'Month': 3, 'expense': 6.0},
{'id': 1, 'Month': 3, 'expense': 7.0},
{'id': 1, 'Month': 3, 'expense': 4.0},
{'id': 1, 'Month': 4, 'expense': 4.0},
{'id': 1, 'Month': 5, 'expense': 6.0},
{'id': 1, 'Month': 6, 'expense': 7.0},
{'id': 1, 'Month': 7, 'expense': 8.0},
{'id': 1, 'Month': 8, 'expense': 9.0},
{'id': 2, 'Month': 1, 'expense': 1.0},
{'id': 2, 'Month': 1, 'expense': 5.0},
{'id': 2, 'Month': 1, 'expense': 6.0},
{'id': 2, 'Month': 2, 'expense': 2.0},
{'id': 2, 'Month': 2, 'expense': 4.0},
{'id': 2, 'Month': 3, 'expense': 14.0},
{'id': 2, 'Month': 3, 'expense': 17.0},
{'id': 2, 'Month': 3, 'expense': 16.0},
{'id': 1, 'Month': 4, 'expense': 4.0},
{'id': 1, 'Month': 5, 'expense': 6.0},
{'id': 1, 'Month': 6, 'expense': 7.0},
{'id': 1, 'Month': 7, 'expense': 8.0},
{'id': 1, 'Month': 8, 'expense': 9.0}]
您可以使用rangeBetween
函数来计算滚动总和。
我将我的框架边界定义为 rangeBetween(-1, 0)
。 -1
表示当前行之前的one-off,0
表示当前行。在此帧上聚合会得到 two-month 滚动平均值:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
w = Window().partitionBy("id").orderBy('Month')
df.groupBy("id", "Month").agg(F.sum(col("expense")).alias("total_expense")) \
.withColumn("rank", F.row_number().over(w)) \
.withColumn("two_month_rolling_sum", F.sum(col("total_expense")).over(w.rangeBetween(-1, 0))) \
.filter(col("rank")!=1)\
.drop("total_expense", "rank")\
.orderBy("id", "Month").show()
输出:
+---+-----+---------------------+
| id|Month|two_month_rolling_sum|
+---+-----+---------------------+
| 1| 2| 68.0|
| 1| 3| 43.0|
| 2| 2| 18.0|
| 2| 3| 53.0|
+---+-----+---------------------+