Pyspark 分区内移动最后 2 (N) 行的累积总和
Pyspark Cumulative sum within Partition for moving last 2 (N) rows
假设我有以下数据框,为了便于视觉排序:
我将如何利用 window 函数创建一个新列,对每个 period
分区中按 Month
列排序的前一行求和:
以下是我的尝试,但我显然在 rowsBetween
函数方面做错了。
df = df.withColumn('CustLast2', sum('Cust').over(Window.partitionBy("period").orderBy('Month').rowsBetween(Window.currentRow, -1))
我觉得你差不多就是他们的了,你只要把-1
换成Window.unboundedPreceding
df = df.withColumn('CustLast2', sum('Cust').over(Window.partitionBy("period").orderBy('Month').rowsBetween(Window.unboundedPreceding, Window.currentRow))
否则,您只是对同一时期内的连续 2 行求和。
我们制作了 Fugue project 以将本机 Python 或 Pandas 代码移植到 Spark 或 Dask。这使您可以通过用原生 Python 表达逻辑来保持逻辑的可读性。然后,Fugue 可以通过一个函数调用将它移植到 Spark。
首先我们从测试 Pandas DataFrame 开始(稍后我们将移植到 Spark):
import pandas as pd
df = pd.DataFrame({"date": ["2020-01-01", "2020-01-02", "2020-01-03"] * 3,
"period": [0,0,0,1,1,1,2,2,2],
"val": [4,5,2] * 3})
然后我们制作一个基于Pandas的函数。请注意,这是针对每个组应用的。稍后再分区。
def rolling(df: pd.DataFrame) -> pd.DataFrame:
df["cum_sum"] = df["val"].rolling(2).sum().fillna(df["val"])
return df
现在我们可以使用 Fugue 变换函数在 Pandas 上进行测试。此函数还处理分区和预排序。
from fugue import transform
transform(df, rolling, schema="*, cum_sum:float", partition={"by":"period", "presort": "date asc"})
因为这可行,我们只需指定引擎就可以将它引入 Spark:
import fugue_spark
transform(df, rolling, schema="*, cum_sum:float", partition={"by":"period", "presort": "date asc"}, engine="spark").show()
+----------+------+---+-------+
| date|period|val|cum_sum|
+----------+------+---+-------+
|2020-01-01| 0| 4| 4.0|
|2020-01-02| 0| 5| 9.0|
|2020-01-03| 0| 2| 7.0|
|2020-01-01| 1| 4| 4.0|
|2020-01-02| 1| 5| 9.0|
|2020-01-03| 1| 2| 7.0|
|2020-01-01| 2| 4| 4.0|
|2020-01-02| 2| 5| 9.0|
|2020-01-03| 2| 2| 7.0|
+----------+------+---+-------+
请注意,由于 Spark 的惰性求值,您现在需要 .show()
。 Fugue 转换函数可以接受 Pandas 和 Spark DataFrames,并将输出
您可以使用以下代码解决此问题:
(df.withColumn('last_value', F.lag(F.col('Cust')).over(W.partitionBy(['Period']).orderBy(F.col('Month'))))
.withColumn('last_value', F.when(F.col('last_value').isNull(), 0).otherwise(F.col('last_value')))
.withColumn('cumSum', F.col('Cust') + F.col('last_value')))
您想要的是对最后 2 行(包括当前行)求和,因此只需像这样指定 rowsBetween
:
from pyspark.sql import functions as F, Window
w = Window.partitionBy('Period').orderBy('Month').rowsBetween(-1, Window.currentRow)
df = df.withColumn('CustLast2', F.sum('Cust').over(w))
您在尝试中颠倒了 window 框架的下限和上限。
假设我有以下数据框,为了便于视觉排序:
我将如何利用 window 函数创建一个新列,对每个 period
分区中按 Month
列排序的前一行求和:
以下是我的尝试,但我显然在 rowsBetween
函数方面做错了。
df = df.withColumn('CustLast2', sum('Cust').over(Window.partitionBy("period").orderBy('Month').rowsBetween(Window.currentRow, -1))
我觉得你差不多就是他们的了,你只要把-1
换成Window.unboundedPreceding
df = df.withColumn('CustLast2', sum('Cust').over(Window.partitionBy("period").orderBy('Month').rowsBetween(Window.unboundedPreceding, Window.currentRow))
否则,您只是对同一时期内的连续 2 行求和。
我们制作了 Fugue project 以将本机 Python 或 Pandas 代码移植到 Spark 或 Dask。这使您可以通过用原生 Python 表达逻辑来保持逻辑的可读性。然后,Fugue 可以通过一个函数调用将它移植到 Spark。
首先我们从测试 Pandas DataFrame 开始(稍后我们将移植到 Spark):
import pandas as pd
df = pd.DataFrame({"date": ["2020-01-01", "2020-01-02", "2020-01-03"] * 3,
"period": [0,0,0,1,1,1,2,2,2],
"val": [4,5,2] * 3})
然后我们制作一个基于Pandas的函数。请注意,这是针对每个组应用的。稍后再分区。
def rolling(df: pd.DataFrame) -> pd.DataFrame:
df["cum_sum"] = df["val"].rolling(2).sum().fillna(df["val"])
return df
现在我们可以使用 Fugue 变换函数在 Pandas 上进行测试。此函数还处理分区和预排序。
from fugue import transform
transform(df, rolling, schema="*, cum_sum:float", partition={"by":"period", "presort": "date asc"})
因为这可行,我们只需指定引擎就可以将它引入 Spark:
import fugue_spark
transform(df, rolling, schema="*, cum_sum:float", partition={"by":"period", "presort": "date asc"}, engine="spark").show()
+----------+------+---+-------+
| date|period|val|cum_sum|
+----------+------+---+-------+
|2020-01-01| 0| 4| 4.0|
|2020-01-02| 0| 5| 9.0|
|2020-01-03| 0| 2| 7.0|
|2020-01-01| 1| 4| 4.0|
|2020-01-02| 1| 5| 9.0|
|2020-01-03| 1| 2| 7.0|
|2020-01-01| 2| 4| 4.0|
|2020-01-02| 2| 5| 9.0|
|2020-01-03| 2| 2| 7.0|
+----------+------+---+-------+
请注意,由于 Spark 的惰性求值,您现在需要 .show()
。 Fugue 转换函数可以接受 Pandas 和 Spark DataFrames,并将输出
您可以使用以下代码解决此问题:
(df.withColumn('last_value', F.lag(F.col('Cust')).over(W.partitionBy(['Period']).orderBy(F.col('Month'))))
.withColumn('last_value', F.when(F.col('last_value').isNull(), 0).otherwise(F.col('last_value')))
.withColumn('cumSum', F.col('Cust') + F.col('last_value')))
您想要的是对最后 2 行(包括当前行)求和,因此只需像这样指定 rowsBetween
:
from pyspark.sql import functions as F, Window
w = Window.partitionBy('Period').orderBy('Month').rowsBetween(-1, Window.currentRow)
df = df.withColumn('CustLast2', F.sum('Cust').over(w))
您在尝试中颠倒了 window 框架的下限和上限。