PySpark Window 函数根据条件检查滞后行
PySpark Window function to check based on condition a lag row
我需要接受(如果有的话)STAT 200。
我只想在最后状态下获取它的日期,对于这种情况,我知道该怎么做,没关系。
但我需要先检查它(如果它不是最后状态)然后,
如果不是最后一个,我想将最后一个状态的日期视为 200,但前提是下一个状态不是次要状态(例如下一个案例中的 150)。这种情况我不想考虑。
PySpark 怎么办?非常感谢
ID
STAT
TIME
1
100
17/11/2021
1
200
18/11/2021
1
150
19/11/2021
1
100
20/11/2021
2
200
20/11/2021
在这种情况下,我不想考虑 ID 1(状态 150 < 200)。所以我只取 ID 2.
ID
STAT
TIME
1
100
17/11/2021
1
200
18/11/2021
1
350
19/11/2021
1
400
20/11/2021
2
200
20/11/2021
在这种情况下,我想考虑状态 200 及其详细信息(ID 1 状态 350 > 200;ID 2 仅 200)
感谢您的支持!
您可以为 lead
函数指定一个默认值,然后使用相同的逻辑用 STAT = 200
处理最后一行,用 STAT=200
处理非最后一行。
from pyspark.sql import functions as F
from pyspark.sql import Window
window_spec = Window.partitionBy("ID").orderBy("TIME")
df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()
例子
场景一
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(1, 100, datetime.strptime("17/11/2021", "%d/%m/%Y"),),
(1, 200, datetime.strptime("18/11/2021", "%d/%m/%Y"),),
(1, 150, datetime.strptime("19/11/2021", "%d/%m/%Y"),),
(1, 100, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
(2, 200, datetime.strptime("20/11/2021", "%d/%m/%Y"),),]
df = spark.createDataFrame(data, ("ID", "STAT", "TIME"))
window_spec = Window.partitionBy("ID").orderBy("TIME")
df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()
输出
+---+----+-------------------+---------+
| ID|STAT| TIME|LEAD_STAT|
+---+----+-------------------+---------+
| 2| 200|2021-11-20 00:00:00| 201|
+---+----+-------------------+---------+
场景二
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(1, 100, datetime.strptime("17/11/2021", "%d/%m/%Y"),),
(1, 200, datetime.strptime("18/11/2021", "%d/%m/%Y"),),
(1, 350, datetime.strptime("19/11/2021", "%d/%m/%Y"),),
(1, 400, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
(2, 200, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
]
df = spark.createDataFrame(data, ("ID", "STAT", "TIME"))
window_spec = Window.partitionBy("ID").orderBy("TIME")
df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()
输出
+---+----+-------------------+---------+
| ID|STAT| TIME|LEAD_STAT|
+---+----+-------------------+---------+
| 1| 200|2021-11-18 00:00:00| 350|
| 2| 200|2021-11-20 00:00:00| 201|
+---+----+-------------------+---------+
我需要接受(如果有的话)STAT 200。 我只想在最后状态下获取它的日期,对于这种情况,我知道该怎么做,没关系。
但我需要先检查它(如果它不是最后状态)然后, 如果不是最后一个,我想将最后一个状态的日期视为 200,但前提是下一个状态不是次要状态(例如下一个案例中的 150)。这种情况我不想考虑。
PySpark 怎么办?非常感谢
ID | STAT | TIME |
---|---|---|
1 | 100 | 17/11/2021 |
1 | 200 | 18/11/2021 |
1 | 150 | 19/11/2021 |
1 | 100 | 20/11/2021 |
2 | 200 | 20/11/2021 |
在这种情况下,我不想考虑 ID 1(状态 150 < 200)。所以我只取 ID 2.
ID | STAT | TIME |
---|---|---|
1 | 100 | 17/11/2021 |
1 | 200 | 18/11/2021 |
1 | 350 | 19/11/2021 |
1 | 400 | 20/11/2021 |
2 | 200 | 20/11/2021 |
在这种情况下,我想考虑状态 200 及其详细信息(ID 1 状态 350 > 200;ID 2 仅 200)
感谢您的支持!
您可以为 lead
函数指定一个默认值,然后使用相同的逻辑用 STAT = 200
处理最后一行,用 STAT=200
处理非最后一行。
from pyspark.sql import functions as F
from pyspark.sql import Window
window_spec = Window.partitionBy("ID").orderBy("TIME")
df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()
例子
场景一
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(1, 100, datetime.strptime("17/11/2021", "%d/%m/%Y"),),
(1, 200, datetime.strptime("18/11/2021", "%d/%m/%Y"),),
(1, 150, datetime.strptime("19/11/2021", "%d/%m/%Y"),),
(1, 100, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
(2, 200, datetime.strptime("20/11/2021", "%d/%m/%Y"),),]
df = spark.createDataFrame(data, ("ID", "STAT", "TIME"))
window_spec = Window.partitionBy("ID").orderBy("TIME")
df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()
输出
+---+----+-------------------+---------+
| ID|STAT| TIME|LEAD_STAT|
+---+----+-------------------+---------+
| 2| 200|2021-11-20 00:00:00| 201|
+---+----+-------------------+---------+
场景二
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(1, 100, datetime.strptime("17/11/2021", "%d/%m/%Y"),),
(1, 200, datetime.strptime("18/11/2021", "%d/%m/%Y"),),
(1, 350, datetime.strptime("19/11/2021", "%d/%m/%Y"),),
(1, 400, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
(2, 200, datetime.strptime("20/11/2021", "%d/%m/%Y"),),
]
df = spark.createDataFrame(data, ("ID", "STAT", "TIME"))
window_spec = Window.partitionBy("ID").orderBy("TIME")
df.withColumn("LEAD_STAT", F.lead("STAT", default=201).over(window_spec)).filter((F.col("STAT") == 200) & (F.col("LEAD_STAT") > 200)).show()
输出
+---+----+-------------------+---------+
| ID|STAT| TIME|LEAD_STAT|
+---+----+-------------------+---------+
| 1| 200|2021-11-18 00:00:00| 350|
| 2| 200|2021-11-20 00:00:00| 201|
+---+----+-------------------+---------+