Pyspark 在列级别内向前和向后填充
Pyspark forward and backward fill within column level
我尝试在 pyspark 数据框中填充缺失的数据。 pyspark 数据框如下所示:
+---------+---------+-------------------+----+
| latitude|longitude| timestamplast|name|
+---------+---------+-------------------+----+
| | 4.905615|2019-08-01 00:00:00| 1|
|51.819645| |2019-08-01 00:00:00| 1|
| 51.81964| 4.961713|2019-08-01 00:00:00| 2|
| | |2019-08-01 00:00:00| 3|
| 51.82918| 4.911187| | 3|
| 51.82385| 4.901488|2019-08-01 00:00:03| 5|
+---------+---------+-------------------+----+
在 "name" 列中,我想向前填充或向后填充(以必要者为准)以仅填充 "latitude" 和 "longitude"("timestamplast" 不应该填充)。我该怎么做?
输出将是:
+---------+---------+-------------------+----+
| latitude|longitude| timestamplast|name|
+---------+---------+-------------------+----+
|51.819645| 4.905615|2019-08-01 00:00:00| 1|
|51.819645| 4.905615|2019-08-01 00:00:00| 1|
| 51.81964| 4.961713|2019-08-01 00:00:00| 2|
| 51.82918| 4.911187|2019-08-01 00:00:00| 3|
| 51.82918| 4.911187| | 3|
| 51.82385| 4.901488|2019-08-01 00:00:03| 5|
+---------+---------+-------------------+----+
在 Pandas 中,将这样做:
df = df.groupby("name")['longitude','latitude'].apply(lambda x : x.ffill().bfill())
这在 Pyspark 中如何完成?
我得到了一个用于向前或向后填充一个目标名称的有效解决方案 "longitude"。我想我也可以为 "latitude" 重复该过程,然后再次为向后填充重复该过程。有没有更有效的方法?
window = Window.partitionBy('name')\
.orderBy('timestamplast')\
.rowsBetween(-sys.maxsize, 0) # this is for forward fill
# .rowsBetween(0,sys.maxsize) # this is for backward fill
# define the forward-filled column
filled_column = last(df['longitude'], ignorenulls=True).over(window) # this is for forward fill
# filled_column = first(df['longitude'], ignorenulls=True).over(window) # this is for backward fill
df = df.withColumn('mmsi_filled', filled_column) # do the fill
我建议您使用以下两个 Window 规格:
from pyspark.sql import Window
w1 = Window.partitionBy('name').orderBy('timestamplast')
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
其中:
w1 是我们用来计算前向填充的常规 WinSpec,与以下内容相同:
w1 = Window.partitionBy('name').orderBy('timestamplast').rowsBetween(Window.unboundedPreceding,0)
有关默认 window 帧,请参阅 documentation 中的以下注释:
Note: When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
在ffill
之后,只要修复最前面的null值就可以了,所以我们可以使用一个固定的Window frame(Between Window.unboundedPreceding 和 Window.unboundedFollowing),这比使用 运行 Window 框架更有效,因为它只需要一个聚合,请参阅 SPARK-8638
那么x.ffill().bfill()可以根据上面两个WindowSpecs使用coalesce + last + first来处理:
from pyspark.sql.functions import coalesce, last, first
df.withColumn('latitude_new', coalesce(last('latitude',True).over(w1), first('latitude',True).over(w2))) \
.select('name','timestamplast', 'latitude','latitude_new') \
.show()
+----+-------------------+---------+------------+
|name| timestamplast| latitude|latitude_new|
+----+-------------------+---------+------------+
| 1|2019-08-01 00:00:00| null| 51.819645|
| 1|2019-08-01 00:00:01| null| 51.819645|
| 1|2019-08-01 00:00:02|51.819645| 51.819645|
| 1|2019-08-01 00:00:03| 51.81964| 51.81964|
| 1|2019-08-01 00:00:04| null| 51.81964|
| 1|2019-08-01 00:00:05| null| 51.81964|
| 1|2019-08-01 00:00:06| null| 51.81964|
| 1|2019-08-01 00:00:07| 51.82385| 51.82385|
+----+-------------------+---------+------------+
编辑: 在多列上处理 (ffill+bfill),使用列表理解:
cols = ['latitude', 'longitude']
df_new = df.select([ c for c in df.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ])
我尝试在 pyspark 数据框中填充缺失的数据。 pyspark 数据框如下所示:
+---------+---------+-------------------+----+
| latitude|longitude| timestamplast|name|
+---------+---------+-------------------+----+
| | 4.905615|2019-08-01 00:00:00| 1|
|51.819645| |2019-08-01 00:00:00| 1|
| 51.81964| 4.961713|2019-08-01 00:00:00| 2|
| | |2019-08-01 00:00:00| 3|
| 51.82918| 4.911187| | 3|
| 51.82385| 4.901488|2019-08-01 00:00:03| 5|
+---------+---------+-------------------+----+
在 "name" 列中,我想向前填充或向后填充(以必要者为准)以仅填充 "latitude" 和 "longitude"("timestamplast" 不应该填充)。我该怎么做?
输出将是:
+---------+---------+-------------------+----+
| latitude|longitude| timestamplast|name|
+---------+---------+-------------------+----+
|51.819645| 4.905615|2019-08-01 00:00:00| 1|
|51.819645| 4.905615|2019-08-01 00:00:00| 1|
| 51.81964| 4.961713|2019-08-01 00:00:00| 2|
| 51.82918| 4.911187|2019-08-01 00:00:00| 3|
| 51.82918| 4.911187| | 3|
| 51.82385| 4.901488|2019-08-01 00:00:03| 5|
+---------+---------+-------------------+----+
在 Pandas 中,将这样做:
df = df.groupby("name")['longitude','latitude'].apply(lambda x : x.ffill().bfill())
这在 Pyspark 中如何完成?
我得到了一个用于向前或向后填充一个目标名称的有效解决方案 "longitude"。我想我也可以为 "latitude" 重复该过程,然后再次为向后填充重复该过程。有没有更有效的方法?
window = Window.partitionBy('name')\
.orderBy('timestamplast')\
.rowsBetween(-sys.maxsize, 0) # this is for forward fill
# .rowsBetween(0,sys.maxsize) # this is for backward fill
# define the forward-filled column
filled_column = last(df['longitude'], ignorenulls=True).over(window) # this is for forward fill
# filled_column = first(df['longitude'], ignorenulls=True).over(window) # this is for backward fill
df = df.withColumn('mmsi_filled', filled_column) # do the fill
我建议您使用以下两个 Window 规格:
from pyspark.sql import Window
w1 = Window.partitionBy('name').orderBy('timestamplast')
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
其中:
w1 是我们用来计算前向填充的常规 WinSpec,与以下内容相同:
w1 = Window.partitionBy('name').orderBy('timestamplast').rowsBetween(Window.unboundedPreceding,0)
有关默认 window 帧,请参阅 documentation 中的以下注释:
Note: When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
在
ffill
之后,只要修复最前面的null值就可以了,所以我们可以使用一个固定的Window frame(Between Window.unboundedPreceding 和 Window.unboundedFollowing),这比使用 运行 Window 框架更有效,因为它只需要一个聚合,请参阅 SPARK-8638
那么x.ffill().bfill()可以根据上面两个WindowSpecs使用coalesce + last + first来处理:
from pyspark.sql.functions import coalesce, last, first
df.withColumn('latitude_new', coalesce(last('latitude',True).over(w1), first('latitude',True).over(w2))) \
.select('name','timestamplast', 'latitude','latitude_new') \
.show()
+----+-------------------+---------+------------+
|name| timestamplast| latitude|latitude_new|
+----+-------------------+---------+------------+
| 1|2019-08-01 00:00:00| null| 51.819645|
| 1|2019-08-01 00:00:01| null| 51.819645|
| 1|2019-08-01 00:00:02|51.819645| 51.819645|
| 1|2019-08-01 00:00:03| 51.81964| 51.81964|
| 1|2019-08-01 00:00:04| null| 51.81964|
| 1|2019-08-01 00:00:05| null| 51.81964|
| 1|2019-08-01 00:00:06| null| 51.81964|
| 1|2019-08-01 00:00:07| 51.82385| 51.82385|
+----+-------------------+---------+------------+
编辑: 在多列上处理 (ffill+bfill),使用列表理解:
cols = ['latitude', 'longitude']
df_new = df.select([ c for c in df.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ])