PySpark window 函数 - 在当前行的 n 个月内

PySpark window function - within n months from current row

当当前行等于 1 时,我想删除当前行 x 个月内(​​基于日期之前和之后)的所有行。

例如鉴于此 PySpark df:

id date target
a "2020-01-01" 0
a "2020-02-01" 0
a "2020-03-01" 0
a "2020-04-01" 1
a "2020-05-01" 0
a "2020-06-01" 0
a "2020-07-01" 0
a "2020-08-01" 0
a "2020-09-01" 0
a "2020-10-01" 1
a "2020-11-01" 0
b "2020-01-01" 0
b "2020-02-01" 0
b "2020-03-01" 0
b "2020-05-01" 1

(注意,id b 不退出四月)

如果使用 x 值为 2,则生成的 df 将为:

id date target
a "2020-01-01" 0
a "2020-04-01" 1
a "2020-07-01" 0
a "2020-10-01" 1
b "2020-01-01" 0
b "2020-02-01" 0
b "2020-05-01" 1

我可以使用下面的代码删除感兴趣行前后的第 x 行,但我想根据日期以两种方式删除当前行和 x 之间的所有行。

window = 2
windowSpec = Window.partitionBy("id").orderBy(['id','date'])
    
df= df.withColumn("lagvalue", lag('target', window).over(windowSpec))    
df= df.withColumn("leadvalue", lead('target', window).over(windowSpec))
df= df.where(col("lagvalue") == 0 & col("leadvalue") == 0)

对于您的情况,rangeBetween 可能非常有用。它关注值并且只取落在范围内的值。例如。 rangeBetween(-2, 2) 将采用从下方 2 到上方 2 的所有值。由于 rangeBetween 不适用于日期(或字符串),我使用 months_between.

将它们翻译成整数
from pyspark.sql import functions as F, Window
df = spark.createDataFrame(
    [('a', '2020-01-01', 0),
     ('a', '2020-02-01', 0),
     ('a', '2020-03-01', 0),
     ('a', '2020-04-01', 1),
     ('a', '2020-05-01', 0),
     ('a', '2020-06-01', 0),
     ('a', '2020-07-01', 0),
     ('a', '2020-08-01', 0),
     ('a', '2020-09-01', 0),
     ('a', '2020-10-01', 1),
     ('a', '2020-11-01', 0),
     ('b', '2020-01-01', 0),
     ('b', '2020-02-01', 0),
     ('b', '2020-03-01', 0),
     ('b', '2020-05-01', 1)],
    ['id', 'date', 'target']
)
window = 2
windowSpec = Window.partitionBy('id').orderBy(F.months_between('date', F.lit('1970-01-01'))).rangeBetween(-window, window)
df = df.withColumn('to_remove', F.sum('target').over(windowSpec) - F.col('target'))
df = df.where(F.col('to_remove') == 0).drop('to_remove')
df.show()
# +---+----------+------+
# | id|      date|target|
# +---+----------+------+
# |  a|2020-01-01|     0|
# |  a|2020-04-01|     1|
# |  a|2020-07-01|     0|
# |  a|2020-10-01|     1|
# |  b|2020-01-01|     0|
# |  b|2020-02-01|     0|
# |  b|2020-05-01|     1|
# +---+----------+------+