如何在 PySpark 的每个分区中回填空值

How to backfill null values in each partition in PySpark

我在 PySpark 中有以下 DataFrame:

Id      DateActual          DateStart               DateEnd                 SourceCode
107 2019-08-11 00:00:00     null                    null                    1111
107 2019-08-16 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
128 2019-02-11 00:00:00     null                    null                    101
128 2019-02-13 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     168
128 2019-02-14 00:00:00     2019-02-13 00:00:00     2019-02-20 00:00:00     187

我需要替换 null 个值以获得以下结果:

Id      DateActual          DateStart               DateEnd                 SourceCode
107 2019-08-11 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
107 2019-08-16 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
128 2019-02-11 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     101
128 2019-02-13 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     168
128 2019-02-14 00:00:00     2019-02-13 00:00:00     2019-02-20 00:00:00     187

基本上,具有 null 值的 DateStartDateEnd 等于 NEXT[=40 的 DateStartDateEnd =] 行,如果它具有相同的 Id.

如何在 PySpark 中按照上述逻辑填写 null 值?

数据帧:

df = (
    sc.parallelize([
        (107, "2019-08-11 00:00:00", None, None, 1111),
        (107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
        (128, "2019-02-11 00:00:00", None, None, 101), 
        (128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168), 
        (128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
    ]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)

这是我试过的:

from pyspark.sql.functions import col, when 
import pyspark.sql.functions as F
from pyspark.sql.window import Window  

my_window = Window.partitionBy("Id").orderBy("DateActual")

df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()

我不需要像 df.na.fill(0) 那样简单的解决方案。我需要用 NEXT ROW 值替换 null 值,这可能假设使用 lag 或其他类似函数。

使用 first 来自 pyspark.sql.functions:

from pyspark.sql import Window
from pyspark.sql.functions import first

# define the window
window = Window.partitionBy('Id')\
               .orderBy('DateActual')\
               .rowsBetween(0,sys.maxsize)

# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)

# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)