如何在多列上向前填充Pyspark
How to do Forward fill in Pyspark on multiple columns
我想在 Pyspark 中对多列进行 forwad 填充。
如果列的起始值为“NaN”,则将其替换为 0。
下面是我的 DF 的样子。
start_timestamp
Column1
Column2
Column3
Column4
2020-11-02 08:51:50
2
null
null
null
2020-11-02 09:14:29
null
null
null
40
2020-11-02 09:18:32
null
4
2
null
2020-11-02 09:32:42
4
null
null
null
2020-11-03 13:06:03
null
null
null
20
2020-11-03 13:10:01
6
null
4
null
2020-11-03 13:54:38
null
5
null
null
2020-11-03 14:46:25
null
null
null
null
2020-11-03 14:57:31
7
null
null
10
2020-11-03 15:07:07
8
7
null
null
预期的 DF 为:
start_timestamp
Column1
Column2
Column3
Column4
2020-11-02 08:51:50
2
0
0
0
2020-11-02 09:14:29
2
0
0
40
2020-11-02 09:18:32
2
4
2
40
2020-11-02 09:32:42
4
4
2
40
2020-11-03 13:06:03
4
4
2
20
2020-11-03 13:10:01
6
4
4
20
2020-11-03 13:54:38
6
5
4
20
2020-11-03 14:46:25
6
5
4
20
2020-11-03 14:57:31
7
5
4
10
2020-11-03 15:07:07
8
7
4
10
下面是我在 Whosebug 上尝试过的代码:
from pyspark.sql import Window
from pyspark.sql.functions import last,first
from pyspark.sql.functions import col, max as max_, min as min_
import sys
def stringReplaceFunc(x, y):
return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL
def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"):
for i in cols:
window = Window\
.partitionBy(F.month(partitioner))\
.orderBy(partitioner)\
.rowsBetween(-sys.maxsize, 0)
df= df\
.withColumn(i, stringReplaceFunc(F.col(i), value))
fill = F.last(df[i], ignorenulls=True).over(window)
df= df.withColumn(i, fill)
return df
df= forwardFillImputer(df, cols=[i for i in df.columns])
代码无法运行,请让我知道我在做什么错误。如果有任何替代解决方案,请告诉我。谢谢
在您当前的代码中,您不应按月对 window 进行分区,使用 rowsBetween
是没有用的。您应该只为每个 start_timestamp
订购一个 window
此外,当没有最后一个值时,您不是在管理案例。您可以使用 coalesce
和文字值 '0'
来管理它
因此您的代码可以改写如下:
from pyspark.sql import functions as F
from pyspark.sql import Window
def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):
for c in cols:
df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))
df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))
return df
df = forwardFillImputer(df, df.columns)
使用以下数据框作为 df
:
+-------------------+-------+-------+-------+-------+
|start_timestamp |Column1|Column2|Column3|Column4|
+-------------------+-------+-------+-------+-------+
|2020-11-02 08:51:50|2 |null |null |null |
|2020-11-02 09:14:29|null |null |null |40 |
|2020-11-02 09:18:32|null |4 |2 |null |
|2020-11-02 09:32:42|4 |null |null |null |
|2020-11-03 13:06:03|null |null |null |20 |
|2020-11-03 13:10:01|6 |null |4 |null |
|2020-11-03 13:54:38|null |5 |null |null |
|2020-11-03 14:46:25|null |null |null |null |
|2020-11-03 14:57:31|7 |null |null |10 |
|2020-11-03 15:07:07|8 |7 |null |null |
+-------------------+-------+-------+-------+-------+
您得到以下输出:
+-------------------+-------+-------+-------+-------+
|start_timestamp |Column1|Column2|Column3|Column4|
+-------------------+-------+-------+-------+-------+
|2020-11-02 08:51:50|2 |0 |0 |0 |
|2020-11-02 09:14:29|2 |0 |0 |40 |
|2020-11-02 09:18:32|2 |4 |2 |40 |
|2020-11-02 09:32:42|4 |4 |2 |40 |
|2020-11-03 13:06:03|4 |4 |2 |20 |
|2020-11-03 13:10:01|6 |4 |4 |20 |
|2020-11-03 13:54:38|6 |5 |4 |20 |
|2020-11-03 14:46:25|6 |5 |4 |20 |
|2020-11-03 14:57:31|7 |5 |4 |10 |
|2020-11-03 15:07:07|8 |7 |4 |10 |
+-------------------+-------+-------+-------+-------+
我想在 Pyspark 中对多列进行 forwad 填充。 如果列的起始值为“NaN”,则将其替换为 0。 下面是我的 DF 的样子。
start_timestamp | Column1 | Column2 | Column3 | Column4 |
---|---|---|---|---|
2020-11-02 08:51:50 | 2 | null | null | null |
2020-11-02 09:14:29 | null | null | null | 40 |
2020-11-02 09:18:32 | null | 4 | 2 | null |
2020-11-02 09:32:42 | 4 | null | null | null |
2020-11-03 13:06:03 | null | null | null | 20 |
2020-11-03 13:10:01 | 6 | null | 4 | null |
2020-11-03 13:54:38 | null | 5 | null | null |
2020-11-03 14:46:25 | null | null | null | null |
2020-11-03 14:57:31 | 7 | null | null | 10 |
2020-11-03 15:07:07 | 8 | 7 | null | null |
预期的 DF 为:
start_timestamp | Column1 | Column2 | Column3 | Column4 |
---|---|---|---|---|
2020-11-02 08:51:50 | 2 | 0 | 0 | 0 |
2020-11-02 09:14:29 | 2 | 0 | 0 | 40 |
2020-11-02 09:18:32 | 2 | 4 | 2 | 40 |
2020-11-02 09:32:42 | 4 | 4 | 2 | 40 |
2020-11-03 13:06:03 | 4 | 4 | 2 | 20 |
2020-11-03 13:10:01 | 6 | 4 | 4 | 20 |
2020-11-03 13:54:38 | 6 | 5 | 4 | 20 |
2020-11-03 14:46:25 | 6 | 5 | 4 | 20 |
2020-11-03 14:57:31 | 7 | 5 | 4 | 10 |
2020-11-03 15:07:07 | 8 | 7 | 4 | 10 |
下面是我在 Whosebug 上尝试过的代码:
from pyspark.sql import Window
from pyspark.sql.functions import last,first
from pyspark.sql.functions import col, max as max_, min as min_
import sys
def stringReplaceFunc(x, y):
return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL
def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"):
for i in cols:
window = Window\
.partitionBy(F.month(partitioner))\
.orderBy(partitioner)\
.rowsBetween(-sys.maxsize, 0)
df= df\
.withColumn(i, stringReplaceFunc(F.col(i), value))
fill = F.last(df[i], ignorenulls=True).over(window)
df= df.withColumn(i, fill)
return df
df= forwardFillImputer(df, cols=[i for i in df.columns])
代码无法运行,请让我知道我在做什么错误。如果有任何替代解决方案,请告诉我。谢谢
在您当前的代码中,您不应按月对 window 进行分区,使用 rowsBetween
是没有用的。您应该只为每个 start_timestamp
此外,当没有最后一个值时,您不是在管理案例。您可以使用 coalesce
和文字值 '0'
因此您的代码可以改写如下:
from pyspark.sql import functions as F
from pyspark.sql import Window
def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):
for c in cols:
df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))
df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))
return df
df = forwardFillImputer(df, df.columns)
使用以下数据框作为 df
:
+-------------------+-------+-------+-------+-------+
|start_timestamp |Column1|Column2|Column3|Column4|
+-------------------+-------+-------+-------+-------+
|2020-11-02 08:51:50|2 |null |null |null |
|2020-11-02 09:14:29|null |null |null |40 |
|2020-11-02 09:18:32|null |4 |2 |null |
|2020-11-02 09:32:42|4 |null |null |null |
|2020-11-03 13:06:03|null |null |null |20 |
|2020-11-03 13:10:01|6 |null |4 |null |
|2020-11-03 13:54:38|null |5 |null |null |
|2020-11-03 14:46:25|null |null |null |null |
|2020-11-03 14:57:31|7 |null |null |10 |
|2020-11-03 15:07:07|8 |7 |null |null |
+-------------------+-------+-------+-------+-------+
您得到以下输出:
+-------------------+-------+-------+-------+-------+
|start_timestamp |Column1|Column2|Column3|Column4|
+-------------------+-------+-------+-------+-------+
|2020-11-02 08:51:50|2 |0 |0 |0 |
|2020-11-02 09:14:29|2 |0 |0 |40 |
|2020-11-02 09:18:32|2 |4 |2 |40 |
|2020-11-02 09:32:42|4 |4 |2 |40 |
|2020-11-03 13:06:03|4 |4 |2 |20 |
|2020-11-03 13:10:01|6 |4 |4 |20 |
|2020-11-03 13:54:38|6 |5 |4 |20 |
|2020-11-03 14:46:25|6 |5 |4 |20 |
|2020-11-03 14:57:31|7 |5 |4 |10 |
|2020-11-03 15:07:07|8 |7 |4 |10 |
+-------------------+-------+-------+-------+-------+