如何在 PySpark 的每个分区中回填空值
How to backfill null values in each partition in PySpark
我在 PySpark 中有以下 DataFrame:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 null null 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 null null 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
我需要替换 null
个值以获得以下结果:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
基本上,具有 null
值的 DateStart
和 DateEnd
等于 NEXT[=40 的 DateStart
和 DateEnd
=] 行,如果它具有相同的 Id
.
如何在 PySpark 中按照上述逻辑填写 null
值?
数据帧:
df = (
sc.parallelize([
(107, "2019-08-11 00:00:00", None, None, 1111),
(107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
(128, "2019-02-11 00:00:00", None, None, 101),
(128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168),
(128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)
这是我试过的:
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F
from pyspark.sql.window import Window
my_window = Window.partitionBy("Id").orderBy("DateActual")
df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()
我不需要像 df.na.fill(0)
那样简单的解决方案。我需要用 NEXT ROW 值替换 null
值,这可能假设使用 lag
或其他类似函数。
使用 first
来自 pyspark.sql.functions
:
from pyspark.sql import Window
from pyspark.sql.functions import first
# define the window
window = Window.partitionBy('Id')\
.orderBy('DateActual')\
.rowsBetween(0,sys.maxsize)
# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)
# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)
我在 PySpark 中有以下 DataFrame:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 null null 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 null null 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
我需要替换 null
个值以获得以下结果:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
基本上,具有 null
值的 DateStart
和 DateEnd
等于 NEXT[=40 的 DateStart
和 DateEnd
=] 行,如果它具有相同的 Id
.
如何在 PySpark 中按照上述逻辑填写 null
值?
数据帧:
df = (
sc.parallelize([
(107, "2019-08-11 00:00:00", None, None, 1111),
(107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
(128, "2019-02-11 00:00:00", None, None, 101),
(128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168),
(128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)
这是我试过的:
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F
from pyspark.sql.window import Window
my_window = Window.partitionBy("Id").orderBy("DateActual")
df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()
我不需要像 df.na.fill(0)
那样简单的解决方案。我需要用 NEXT ROW 值替换 null
值,这可能假设使用 lag
或其他类似函数。
使用 first
来自 pyspark.sql.functions
:
from pyspark.sql import Window
from pyspark.sql.functions import first
# define the window
window = Window.partitionBy('Id')\
.orderBy('DateActual')\
.rowsBetween(0,sys.maxsize)
# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)
# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)