PySpark Window 函数根据条件检查滞后行

PySpark Window function to check based on condition a lag row

我需要接受(如果有的话)STAT 200。 我只想在最后状态下获取它的日期,对于这种情况,我知道该怎么做,没关系。

但我需要先检查它(如果它不是最后状态)然后, 如果不是最后一个,我想将最后一个状态的日期视为 200,但前提是下一个状态不是次要状态(例如下一个案例中的 150)。这种情况我不想考虑。

PySpark 怎么办?非常感谢

ID STAT TIME
1 100 17/11/2021
1 200 18/11/2021
1 150 19/11/2021
1 100 20/11/2021
2 200 20/11/2021

在这种情况下,我不想考虑 ID 1(状态 150 < 200)。所以我只取 ID 2.

ID STAT TIME
1 100 17/11/2021
1 200 18/11/2021
1 350 19/11/2021
1 400 20/11/2021
2 200 20/11/2021

在这种情况下,我想考虑状态 200 及其详细信息(ID 1 状态 350 > 200;ID 2 仅 200)

感谢您的支持!

您可以为 lead 函数指定一个默认值,然后使用相同的逻辑用 STAT = 200 处理最后一行,用 STAT=200 处理非最后一行。


from pyspark.sql import functions as F
from pyspark.sql import Window

window_spec = Window.partitionBy("ID").orderBy("TIME")

df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()

例子

场景一


from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(1, 100, datetime.strptime("17/11/2021", "%d/%m/%Y"),),
(1, 200, datetime.strptime("18/11/2021", "%d/%m/%Y"),),
(1, 150, datetime.strptime("19/11/2021", "%d/%m/%Y"),),
(1, 100, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
(2, 200, datetime.strptime("20/11/2021", "%d/%m/%Y"),),]

df = spark.createDataFrame(data, ("ID", "STAT", "TIME"))

window_spec = Window.partitionBy("ID").orderBy("TIME")

df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()

输出

+---+----+-------------------+---------+
| ID|STAT|               TIME|LEAD_STAT|
+---+----+-------------------+---------+
|  2| 200|2021-11-20 00:00:00|      201|
+---+----+-------------------+---------+

场景二

from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window


data = [(1, 100, datetime.strptime("17/11/2021", "%d/%m/%Y"),),
(1, 200, datetime.strptime("18/11/2021", "%d/%m/%Y"),),
(1, 350, datetime.strptime("19/11/2021", "%d/%m/%Y"),),
(1, 400, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
(2, 200, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
]

df = spark.createDataFrame(data, ("ID", "STAT", "TIME"))

window_spec = Window.partitionBy("ID").orderBy("TIME")

df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()

输出

+---+----+-------------------+---------+
| ID|STAT|               TIME|LEAD_STAT|
+---+----+-------------------+---------+
|  1| 200|2021-11-18 00:00:00|      350|
|  2| 200|2021-11-20 00:00:00|      201|
+---+----+-------------------+---------+