如何在多列上向前填充Pyspark

How to do Forward fill in Pyspark on multiple columns

我想在 Pyspark 中对多列进行 forwad 填充。 如果列的起始值为“NaN”,则将其替换为 0。 下面是我的 DF 的样子。

start_timestamp Column1 Column2 Column3 Column4
2020-11-02 08:51:50 2 null null null
2020-11-02 09:14:29 null null null 40
2020-11-02 09:18:32 null 4 2 null
2020-11-02 09:32:42 4 null null null
2020-11-03 13:06:03 null null null 20
2020-11-03 13:10:01 6 null 4 null
2020-11-03 13:54:38 null 5 null null
2020-11-03 14:46:25 null null null null
2020-11-03 14:57:31 7 null null 10
2020-11-03 15:07:07 8 7 null null

预期的 DF 为:

start_timestamp Column1 Column2 Column3 Column4
2020-11-02 08:51:50 2 0 0 0
2020-11-02 09:14:29 2 0 0 40
2020-11-02 09:18:32 2 4 2 40
2020-11-02 09:32:42 4 4 2 40
2020-11-03 13:06:03 4 4 2 20
2020-11-03 13:10:01 6 4 4 20
2020-11-03 13:54:38 6 5 4 20
2020-11-03 14:46:25 6 5 4 20
2020-11-03 14:57:31 7 5 4 10
2020-11-03 15:07:07 8 7 4 10

下面是我在 Whosebug 上尝试过的代码:

from pyspark.sql import Window
from pyspark.sql.functions import last,first
from pyspark.sql.functions import col, max as max_, min as min_
import sys

def stringReplaceFunc(x, y):
    return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL

def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"): 
      for i in cols:
        window = Window\
        .partitionBy(F.month(partitioner))\
        .orderBy(partitioner)\
        .rowsBetween(-sys.maxsize, 0)
        df= df\
        .withColumn(i, stringReplaceFunc(F.col(i), value))
        fill = F.last(df[i], ignorenulls=True).over(window)
        df= df.withColumn(i,  fill)
        return df

df= forwardFillImputer(df, cols=[i for i in df.columns])

代码无法运行,请让我知道我在做什么错误。如果有任何替代解决方案,请告诉我。谢谢

在您当前的代码中,您不应按月对 window 进行分区,使用 rowsBetween 是没有用的。您应该只为每个 start_timestamp

订购一个 window

此外,当没有最后一个值时,您不是在管理案例。您可以使用 coalesce 和文字值 '0'

来管理它

因此您的代码可以改写如下:

from pyspark.sql import functions as F
from pyspark.sql import Window

def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):
    for c in cols:
        df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))
        df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))
    return df

df = forwardFillImputer(df, df.columns)

使用以下数据框作为 df

+-------------------+-------+-------+-------+-------+
|start_timestamp    |Column1|Column2|Column3|Column4|
+-------------------+-------+-------+-------+-------+
|2020-11-02 08:51:50|2      |null   |null   |null   |
|2020-11-02 09:14:29|null   |null   |null   |40     |
|2020-11-02 09:18:32|null   |4      |2      |null   |
|2020-11-02 09:32:42|4      |null   |null   |null   |
|2020-11-03 13:06:03|null   |null   |null   |20     |
|2020-11-03 13:10:01|6      |null   |4      |null   |
|2020-11-03 13:54:38|null   |5      |null   |null   |
|2020-11-03 14:46:25|null   |null   |null   |null   |
|2020-11-03 14:57:31|7      |null   |null   |10     |
|2020-11-03 15:07:07|8      |7      |null   |null   |
+-------------------+-------+-------+-------+-------+

您得到以下输出:

+-------------------+-------+-------+-------+-------+
|start_timestamp    |Column1|Column2|Column3|Column4|
+-------------------+-------+-------+-------+-------+
|2020-11-02 08:51:50|2      |0      |0      |0      |
|2020-11-02 09:14:29|2      |0      |0      |40     |
|2020-11-02 09:18:32|2      |4      |2      |40     |
|2020-11-02 09:32:42|4      |4      |2      |40     |
|2020-11-03 13:06:03|4      |4      |2      |20     |
|2020-11-03 13:10:01|6      |4      |4      |20     |
|2020-11-03 13:54:38|6      |5      |4      |20     |
|2020-11-03 14:46:25|6      |5      |4      |20     |
|2020-11-03 14:57:31|7      |5      |4      |10     |
|2020-11-03 15:07:07|8      |7      |4      |10     |
+-------------------+-------+-------+-------+-------+