PySpark 切片数据集添加列直到条件
PySpark slice dataset adding a column until a condition
我必须根据确定的条件添加一个包含开始日期的列:
STATUS.isin('CREATED', 'CREATED AGAIN')
对于每个 ID,我想从状态为 CREATION
或 CREATE AGAIN
的日期开始,我想重现数据集,为 TIME
window 添加新的列分区.
例如:
输入:
ID
$
STATUS
TIME
1
050
CREATED
2021-11-01
1
120
ATTEMPTING
2021-11-01
1
130
VALID
2021-11-01
1
200
OK
2021-11-02
1
100
CREATED AGAIN
2021-11-03
1
160
OK
2021-11-03
1
200
ONGOING
2021-11-04
1
300
OK
2021-11-05
1
1000
FINAL
2021-11-06
2
...
...
...
输出:
ID
$
STATUS
TIME
START_DATE
1
050
CREATED
2021-11-01
2021-11-01
1
120
ATTEMPTING
2021-11-01
2021-11-01
1
130
VALID
2021-11-01
2021-11-01
1
200
OK
2021-11-02
2021-11-01
1
100
CREATED AGAIN
2021-11-03
2021-11-03
1
160
OK
2021-11-03
2021-11-03
1
200
ONGOING
2021-11-04
2021-11-03
1
300
OK
2021-11-05
2021-11-03
1
1000
FINAL
2021-11-06
2021-11-03
2
...
...
...
...
您可以使用累积条件和添加列 group
,然后创建列 START_DATE
作为 first("TIME")
在 Window 上由 ID
+ [= 分区11=]:
from pyspark.sql import functions as F, Window
w1 = Window.partitionBy("ID").orderBy("TIME")
w2 = Window.partitionBy("ID", "group").orderBy("TIME")
df1 = df.withColumn(
"group",
F.sum(F.when(F.col("STATUS").isin(['CREATED', 'CREATED AGAIN']), 1)).over(w1)
).withColumn(
"START_DATE",
F.first("TIME").over(w2)
).drop("group")
df1.show()
#+---+----+-------------+----------+----------+
#| ID| $| STATUS| TIME|START_DATE|
#+---+----+-------------+----------+----------+
#| 1| 050| CREATED|2021-11-01|2021-11-01|
#| 1| 120| ATTEMPTING|2021-11-01|2021-11-01|
#| 1| 130| VALID|2021-11-01|2021-11-01|
#| 1| 200| OK|2021-11-02|2021-11-01|
#| 1| 100|CREATED AGAIN|2021-11-03|2021-11-03|
#| 1| 160| OK|2021-11-03|2021-11-03|
#| 1| 200| ONGOING|2021-11-04|2021-11-03|
#| 1| 300| OK|2021-11-05|2021-11-03|
#| 1|1000| FINAL|2021-11-06|2021-11-03|
#+---+----+-------------+----------+----------+
我必须根据确定的条件添加一个包含开始日期的列:
STATUS.isin('CREATED', 'CREATED AGAIN')
对于每个 ID,我想从状态为 CREATION
或 CREATE AGAIN
的日期开始,我想重现数据集,为 TIME
window 添加新的列分区.
例如:
输入:
ID | $ | STATUS | TIME |
---|---|---|---|
1 | 050 | CREATED | 2021-11-01 |
1 | 120 | ATTEMPTING | 2021-11-01 |
1 | 130 | VALID | 2021-11-01 |
1 | 200 | OK | 2021-11-02 |
1 | 100 | CREATED AGAIN | 2021-11-03 |
1 | 160 | OK | 2021-11-03 |
1 | 200 | ONGOING | 2021-11-04 |
1 | 300 | OK | 2021-11-05 |
1 | 1000 | FINAL | 2021-11-06 |
2 | ... | ... | ... |
输出:
ID | $ | STATUS | TIME | START_DATE |
---|---|---|---|---|
1 | 050 | CREATED | 2021-11-01 | 2021-11-01 |
1 | 120 | ATTEMPTING | 2021-11-01 | 2021-11-01 |
1 | 130 | VALID | 2021-11-01 | 2021-11-01 |
1 | 200 | OK | 2021-11-02 | 2021-11-01 |
1 | 100 | CREATED AGAIN | 2021-11-03 | 2021-11-03 |
1 | 160 | OK | 2021-11-03 | 2021-11-03 |
1 | 200 | ONGOING | 2021-11-04 | 2021-11-03 |
1 | 300 | OK | 2021-11-05 | 2021-11-03 |
1 | 1000 | FINAL | 2021-11-06 | 2021-11-03 |
2 | ... | ... | ... | ... |
您可以使用累积条件和添加列 group
,然后创建列 START_DATE
作为 first("TIME")
在 Window 上由 ID
+ [= 分区11=]:
from pyspark.sql import functions as F, Window
w1 = Window.partitionBy("ID").orderBy("TIME")
w2 = Window.partitionBy("ID", "group").orderBy("TIME")
df1 = df.withColumn(
"group",
F.sum(F.when(F.col("STATUS").isin(['CREATED', 'CREATED AGAIN']), 1)).over(w1)
).withColumn(
"START_DATE",
F.first("TIME").over(w2)
).drop("group")
df1.show()
#+---+----+-------------+----------+----------+
#| ID| $| STATUS| TIME|START_DATE|
#+---+----+-------------+----------+----------+
#| 1| 050| CREATED|2021-11-01|2021-11-01|
#| 1| 120| ATTEMPTING|2021-11-01|2021-11-01|
#| 1| 130| VALID|2021-11-01|2021-11-01|
#| 1| 200| OK|2021-11-02|2021-11-01|
#| 1| 100|CREATED AGAIN|2021-11-03|2021-11-03|
#| 1| 160| OK|2021-11-03|2021-11-03|
#| 1| 200| ONGOING|2021-11-04|2021-11-03|
#| 1| 300| OK|2021-11-05|2021-11-03|
#| 1|1000| FINAL|2021-11-06|2021-11-03|
#+---+----+-------------+----------+----------+