Pyspark 在列级别内向前和向后填充

Pyspark forward and backward fill within column level

我尝试在 pyspark 数据框中填充缺失的数据。 pyspark 数据框如下所示:

+---------+---------+-------------------+----+
| latitude|longitude|      timestamplast|name|
+---------+---------+-------------------+----+
|         | 4.905615|2019-08-01 00:00:00|   1|
|51.819645|         |2019-08-01 00:00:00|   1|
| 51.81964| 4.961713|2019-08-01 00:00:00|   2|
|         |         |2019-08-01 00:00:00|   3|
| 51.82918| 4.911187|                   |   3|
| 51.82385| 4.901488|2019-08-01 00:00:03|   5|
+---------+---------+-------------------+----+

在 "name" 列中,我想向前填充或向后填充(以必要者为准)以仅填充 "latitude" 和 "longitude"("timestamplast" 不应该填充)。我该怎么做?

输出将是:

+---------+---------+-------------------+----+
| latitude|longitude|      timestamplast|name|
+---------+---------+-------------------+----+
|51.819645| 4.905615|2019-08-01 00:00:00|   1|
|51.819645| 4.905615|2019-08-01 00:00:00|   1|
| 51.81964| 4.961713|2019-08-01 00:00:00|   2|
| 51.82918| 4.911187|2019-08-01 00:00:00|   3|
| 51.82918| 4.911187|                   |   3|
| 51.82385| 4.901488|2019-08-01 00:00:03|   5|
+---------+---------+-------------------+----+

在 Pandas 中,将这样做:

df = df.groupby("name")['longitude','latitude'].apply(lambda x : x.ffill().bfill())

这在 Pyspark 中如何完成?

我得到了一个用于向前或向后填充一个目标名称的有效解决方案 "longitude"。我想我也可以为 "latitude" 重复该过程,然后再次为向后填充重复该过程。有没有更有效的方法?

window = Window.partitionBy('name')\
               .orderBy('timestamplast')\
               .rowsBetween(-sys.maxsize, 0) # this is for forward fill  
               # .rowsBetween(0,sys.maxsize) # this is for backward fill  

# define the forward-filled column
filled_column = last(df['longitude'], ignorenulls=True).over(window)  # this is for forward fill  
# filled_column = first(df['longitude'], ignorenulls=True).over(window)  # this is for backward fill

df = df.withColumn('mmsi_filled', filled_column) # do the fill

我建议您使用以下两个 Window 规格:

from pyspark.sql import Window
w1 = Window.partitionBy('name').orderBy('timestamplast')
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

其中:

  1. w1 是我们用来计算前向填充的常规 WinSpec,与以下内容相同:

    w1 = Window.partitionBy('name').orderBy('timestamplast').rowsBetween(Window.unboundedPreceding,0)
    

    有关默认 window 帧,请参阅 documentation 中的以下注释:

    Note: When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.

  2. ffill之后,只要修复最前面的null值就可以了,所以我们可以使用一个固定的Window frame(Between Window.unboundedPreceding 和 Window.unboundedFollowing),这比使用 运行 Window 框架更有效,因为它只需要一个聚合,请参阅 SPARK-8638

那么x.ffill().bfill()可以根据上面两个WindowSpecs使用coalesce + last + first来处理:

from pyspark.sql.functions import coalesce, last, first

df.withColumn('latitude_new', coalesce(last('latitude',True).over(w1), first('latitude',True).over(w2))) \
  .select('name','timestamplast', 'latitude','latitude_new') \
  .show()
+----+-------------------+---------+------------+
|name|      timestamplast| latitude|latitude_new|
+----+-------------------+---------+------------+
|   1|2019-08-01 00:00:00|     null|   51.819645|
|   1|2019-08-01 00:00:01|     null|   51.819645|
|   1|2019-08-01 00:00:02|51.819645|   51.819645|
|   1|2019-08-01 00:00:03| 51.81964|    51.81964|
|   1|2019-08-01 00:00:04|     null|    51.81964|
|   1|2019-08-01 00:00:05|     null|    51.81964|
|   1|2019-08-01 00:00:06|     null|    51.81964|
|   1|2019-08-01 00:00:07| 51.82385|    51.82385|
+----+-------------------+---------+------------+

编辑: 在多列上处理 (ffill+bfill),使用列表理解:

cols = ['latitude', 'longitude']
df_new = df.select([ c for c in df.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ])