PySpark 切片数据集添加列直到条件

PySpark slice dataset adding a column until a condition

我必须根据确定的条件添加一个包含开始日期的列:

STATUS.isin('CREATED', 'CREATED AGAIN')

对于每个 ID,我想从状态为 CREATIONCREATE AGAIN 的日期开始,我想重现数据集,为 TIME window 添加新的列分区.

例如:

输入:

ID $ STATUS TIME
1 050 CREATED 2021-11-01
1 120 ATTEMPTING 2021-11-01
1 130 VALID 2021-11-01
1 200 OK 2021-11-02
1 100 CREATED AGAIN 2021-11-03
1 160 OK 2021-11-03
1 200 ONGOING 2021-11-04
1 300 OK 2021-11-05
1 1000 FINAL 2021-11-06
2 ... ... ...

输出:

ID $ STATUS TIME START_DATE
1 050 CREATED 2021-11-01 2021-11-01
1 120 ATTEMPTING 2021-11-01 2021-11-01
1 130 VALID 2021-11-01 2021-11-01
1 200 OK 2021-11-02 2021-11-01
1 100 CREATED AGAIN 2021-11-03 2021-11-03
1 160 OK 2021-11-03 2021-11-03
1 200 ONGOING 2021-11-04 2021-11-03
1 300 OK 2021-11-05 2021-11-03
1 1000 FINAL 2021-11-06 2021-11-03
2 ... ... ... ...

您可以使用累积条件和添加列 group,然后创建列 START_DATE 作为 first("TIME") 在 Window 上由 ID + [= 分区11=]:

from pyspark.sql import functions as F, Window

w1 = Window.partitionBy("ID").orderBy("TIME")
w2 = Window.partitionBy("ID", "group").orderBy("TIME")

df1 = df.withColumn(
    "group",
    F.sum(F.when(F.col("STATUS").isin(['CREATED', 'CREATED AGAIN']), 1)).over(w1)
).withColumn(
    "START_DATE",
    F.first("TIME").over(w2)
).drop("group")

df1.show()
#+---+----+-------------+----------+----------+
#| ID|   $|       STATUS|      TIME|START_DATE|
#+---+----+-------------+----------+----------+
#|  1| 050|      CREATED|2021-11-01|2021-11-01|
#|  1| 120|   ATTEMPTING|2021-11-01|2021-11-01|
#|  1| 130|        VALID|2021-11-01|2021-11-01|
#|  1| 200|           OK|2021-11-02|2021-11-01|
#|  1| 100|CREATED AGAIN|2021-11-03|2021-11-03|
#|  1| 160|           OK|2021-11-03|2021-11-03|
#|  1| 200|      ONGOING|2021-11-04|2021-11-03|
#|  1| 300|           OK|2021-11-05|2021-11-03|
#|  1|1000|        FINAL|2021-11-06|2021-11-03|
#+---+----+-------------+----------+----------+