Pyspark 分区内移动最后 2 (N) 行的累积总和

Pyspark Cumulative sum within Partition for moving last 2 (N) rows

假设我有以下数据框,为了便于视觉排序:

我将如何利用 window 函数创建一个新列,对每个 period 分区中按 Month 列排序的前一行求和:

以下是我的尝试,但我显然在 rowsBetween 函数方面做错了。

df = df.withColumn('CustLast2', sum('Cust').over(Window.partitionBy("period").orderBy('Month').rowsBetween(Window.currentRow, -1))

我觉得你差不多就是他们的了,你只要把-1换成Window.unboundedPreceding

df = df.withColumn('CustLast2', sum('Cust').over(Window.partitionBy("period").orderBy('Month').rowsBetween(Window.unboundedPreceding, Window.currentRow))

否则,您只是对同一时期内的连续 2 行求和。

我们制作了 Fugue project 以将本机 Python 或 Pandas 代码移植到 Spark 或 Dask。这使您可以通过用原生 Python 表达逻辑来保持逻辑的可读性。然后,Fugue 可以通过一个函数调用将它移植到 Spark。

首先我们从测试 Pandas DataFrame 开始(稍后我们将移植到 Spark):

import pandas as pd
df = pd.DataFrame({"date": ["2020-01-01", "2020-01-02", "2020-01-03"] * 3, 
                   "period": [0,0,0,1,1,1,2,2,2],
                   "val": [4,5,2] * 3})

然后我们制作一个基于Pandas的函数。请注意,这是针对每个组应用的。稍后再分区。

def rolling(df: pd.DataFrame) -> pd.DataFrame:
    df["cum_sum"] = df["val"].rolling(2).sum().fillna(df["val"])
    return df

现在我们可以使用 Fugue 变换函数在 Pandas 上进行测试。此函数还处理分区和预排序。

from fugue import transform
transform(df, rolling, schema="*, cum_sum:float", partition={"by":"period", "presort": "date asc"})

因为这可行,我们只需指定引擎就可以将它引入 Spark:

import fugue_spark
transform(df, rolling, schema="*, cum_sum:float", partition={"by":"period", "presort": "date asc"}, engine="spark").show()
+----------+------+---+-------+
|      date|period|val|cum_sum|
+----------+------+---+-------+
|2020-01-01|     0|  4|    4.0|
|2020-01-02|     0|  5|    9.0|
|2020-01-03|     0|  2|    7.0|
|2020-01-01|     1|  4|    4.0|
|2020-01-02|     1|  5|    9.0|
|2020-01-03|     1|  2|    7.0|
|2020-01-01|     2|  4|    4.0|
|2020-01-02|     2|  5|    9.0|
|2020-01-03|     2|  2|    7.0|
+----------+------+---+-------+

请注意,由于 Spark 的惰性求值,您现在需要 .show()。 Fugue 转换函数可以接受 Pandas 和 Spark DataFrames,并将输出

您可以使用以下代码解决此问题:

(df.withColumn('last_value', F.lag(F.col('Cust')).over(W.partitionBy(['Period']).orderBy(F.col('Month'))))
   .withColumn('last_value', F.when(F.col('last_value').isNull(), 0).otherwise(F.col('last_value')))
   .withColumn('cumSum', F.col('Cust') + F.col('last_value')))

您想要的是对最后 2 行(包括当前行)求和,因此只需像这样指定 rowsBetween

from pyspark.sql import functions as F, Window

w = Window.partitionBy('Period').orderBy('Month').rowsBetween(-1, Window.currentRow)

df = df.withColumn('CustLast2', F.sum('Cust').over(w))

您在尝试中颠倒了 window 框架的下限和上限。