PYSPARK:根据条件用另一行值更新一行中的值?
PYSPARK : Update value in a row with another row value based on condition?
我需要根据以下条件和 return 特定记录更新基于唯一票证 ID(groupby) 的一些列:
1.Whenever 状态已关闭 - 特定记录 run_date 只有 需要在 closed_time 列中更新 的 已关闭状态记录 基于唯一工单 ID。
2.Whenever 状态为进行中 - 特定记录 run_date 只有 需要在 [=48= 中更新] column of the closed status record based on unique ticket id.(只有 run_date 会在已关闭状态记录的 inprogress_time 中得到更新).
3.Whenever 状态已取消 - 该特定记录 run_date 只需要在 cancelled_time 列 中更新 [=26] =]取消状态记录基于唯一的工单ID。
INPUT DATAFRAME
Id type inprogress_time closed_time cancelled_time status Source_system Run_date
11 TRUCK NAN NAN NAN Created LIBERATE 1/9/2021 12:00
11 TRUCK NAN NAN NAN In_Progress LIBERATE 1/9/2021 12:00
11 TRUCK NAN NAN NAN Closed LIBERATE 8/9/2021 19:21
22 TRUCK NAN NAN NAN Cancelled LIBERATE 3/9/2021 15:08
33 TRUCK NAN NAN NAN Created LIBERATE 4/10/2021 15:08
33 TRUCK NAN NAN NAN In_Progress LIBERATE 4/10/2021 15:08
33 TRUCK NAN NAN NAN Closed LIBERATE 5/10/2021 15:08
EXPECTED RESULT(OUTPUT DATAFRAME)
Id type inprogress_time closed_time cancelled_time status Source_system run_date
11 TRUCK 1/9/2021 12:00 8/9/2021 19:21 NAN Closed LIBERATE 8/9/2021 19:21
22 TRUCK NAN NAN 3/9/2021 15:08 Cancelled LIBERATE 3/9/2021 15:08
33 TRUCK 4/10/2021 15:08 5/10/2021 15:08 NAN Closed LIBERATE 5/10/2021 15:08
我认为pivot
更有效率。
df.groupBy('Id', 'type', 'Source_system').pivot('status').agg(f.first('Run_date')) \
.withColumnRenamed('Cancelled', 'cancelled_time') \
.withColumnRenamed('Closed', 'closed_time') \
.withColumnRenamed('In_Progress', 'inprogress_time') \
.drop('Created') \
.withColumn('status', f.expr('''
CASE WHEN cancelled_time is not null THEN 'Cancelled'
WHEN closed_time is not null THEN 'Closed'
WHEN inprogress_time is not null THEN 'In_Progress'
ELSE 'Created' END ''')) \
.show(truncate=False)
+---+-----+-------------+--------------+---------------+---------------+---------+
|Id |type |Source_system|cancelled_time|closed_time |inprogress_time|status |
+---+-----+-------------+--------------+---------------+---------------+---------+
|33 |TRUCK|LIBERATE |null |5/10/2021 15:08|4/10/2021 15:08|Closed |
|11 |TRUCK|LIBERATE |null |8/9/2021 19:21 |1/9/2021 12:00 |Closed |
|22 |TRUCK|LIBERATE |3/9/2021 15:08|null |null |Cancelled|
+---+-----+-------------+--------------+---------------+---------------+---------+
我需要根据以下条件和 return 特定记录更新基于唯一票证 ID(groupby) 的一些列:
1.Whenever 状态已关闭 - 特定记录 run_date 只有 需要在 closed_time 列中更新 的 已关闭状态记录 基于唯一工单 ID。
2.Whenever 状态为进行中 - 特定记录 run_date 只有 需要在 [=48= 中更新] column of the closed status record based on unique ticket id.(只有 run_date 会在已关闭状态记录的 inprogress_time 中得到更新).
3.Whenever 状态已取消 - 该特定记录 run_date 只需要在 cancelled_time 列 中更新 [=26] =]取消状态记录基于唯一的工单ID。
INPUT DATAFRAME
Id type inprogress_time closed_time cancelled_time status Source_system Run_date
11 TRUCK NAN NAN NAN Created LIBERATE 1/9/2021 12:00
11 TRUCK NAN NAN NAN In_Progress LIBERATE 1/9/2021 12:00
11 TRUCK NAN NAN NAN Closed LIBERATE 8/9/2021 19:21
22 TRUCK NAN NAN NAN Cancelled LIBERATE 3/9/2021 15:08
33 TRUCK NAN NAN NAN Created LIBERATE 4/10/2021 15:08
33 TRUCK NAN NAN NAN In_Progress LIBERATE 4/10/2021 15:08
33 TRUCK NAN NAN NAN Closed LIBERATE 5/10/2021 15:08
EXPECTED RESULT(OUTPUT DATAFRAME)
Id type inprogress_time closed_time cancelled_time status Source_system run_date
11 TRUCK 1/9/2021 12:00 8/9/2021 19:21 NAN Closed LIBERATE 8/9/2021 19:21
22 TRUCK NAN NAN 3/9/2021 15:08 Cancelled LIBERATE 3/9/2021 15:08
33 TRUCK 4/10/2021 15:08 5/10/2021 15:08 NAN Closed LIBERATE 5/10/2021 15:08
我认为pivot
更有效率。
df.groupBy('Id', 'type', 'Source_system').pivot('status').agg(f.first('Run_date')) \
.withColumnRenamed('Cancelled', 'cancelled_time') \
.withColumnRenamed('Closed', 'closed_time') \
.withColumnRenamed('In_Progress', 'inprogress_time') \
.drop('Created') \
.withColumn('status', f.expr('''
CASE WHEN cancelled_time is not null THEN 'Cancelled'
WHEN closed_time is not null THEN 'Closed'
WHEN inprogress_time is not null THEN 'In_Progress'
ELSE 'Created' END ''')) \
.show(truncate=False)
+---+-----+-------------+--------------+---------------+---------------+---------+
|Id |type |Source_system|cancelled_time|closed_time |inprogress_time|status |
+---+-----+-------------+--------------+---------------+---------------+---------+
|33 |TRUCK|LIBERATE |null |5/10/2021 15:08|4/10/2021 15:08|Closed |
|11 |TRUCK|LIBERATE |null |8/9/2021 19:21 |1/9/2021 12:00 |Closed |
|22 |TRUCK|LIBERATE |3/9/2021 15:08|null |null |Cancelled|
+---+-----+-------------+--------------+---------------+---------------+---------+