PYSPARK:根据条件用另一行值更新一行中的值?

PYSPARK : Update value in a row with another row value based on condition?

我需要根据以下条件和 return 特定记录更新基于唯一票证 ID(groupby) 的一些列:

1.Whenever 状态已关闭 - 特定记录 run_date 只有 需要在 closed_time 列中更新 已关闭状态记录 基于唯一工单 ID。

2.Whenever 状态为进行中 - 特定记录 run_date 只有 需要在 [=48= 中更新] column of the closed status record based on unique ticket id.(只有 run_date 会在已关闭状态记录的 inprogress_time 中得到更新).

3.Whenever 状态已取消 - 该特定记录 run_date 只需要在 cancelled_time 列 中更新 [=26] =]取消状态记录基于唯一的工单ID。

INPUT DATAFRAME

Id  type  inprogress_time  closed_time  cancelled_time  status    Source_system   Run_date
11  TRUCK   NAN            NAN           NAN            Created     LIBERATE    1/9/2021 12:00
11  TRUCK   NAN            NAN           NAN            In_Progress LIBERATE    1/9/2021 12:00
11  TRUCK   NAN            NAN           NAN            Closed      LIBERATE    8/9/2021 19:21
22  TRUCK   NAN            NAN           NAN            Cancelled   LIBERATE    3/9/2021 15:08
33  TRUCK   NAN            NAN           NAN            Created     LIBERATE    4/10/2021 15:08
33  TRUCK   NAN            NAN           NAN            In_Progress LIBERATE    4/10/2021 15:08
33  TRUCK   NAN            NAN           NAN            Closed      LIBERATE    5/10/2021 15:08

EXPECTED RESULT(OUTPUT DATAFRAME)

Id  type    inprogress_time  closed_time    cancelled_time     status       Source_system  run_date
11  TRUCK   1/9/2021 12:00  8/9/2021 19:21   NAN               Closed        LIBERATE  8/9/2021 19:21
22  TRUCK   NAN             NAN              3/9/2021 15:08    Cancelled     LIBERATE  3/9/2021 15:08                 
33  TRUCK   4/10/2021 15:08 5/10/2021 15:08  NAN               Closed        LIBERATE  5/10/2021 15:08

我认为pivot更有效率。

df.groupBy('Id', 'type', 'Source_system').pivot('status').agg(f.first('Run_date')) \
  .withColumnRenamed('Cancelled',   'cancelled_time') \
  .withColumnRenamed('Closed',      'closed_time') \
  .withColumnRenamed('In_Progress', 'inprogress_time') \
  .drop('Created') \
  .withColumn('status', f.expr('''
      CASE WHEN cancelled_time  is not null THEN 'Cancelled'
           WHEN closed_time     is not null THEN 'Closed'
           WHEN inprogress_time is not null THEN 'In_Progress'
      ELSE 'Created' END ''')) \
  .show(truncate=False)

+---+-----+-------------+--------------+---------------+---------------+---------+
|Id |type |Source_system|cancelled_time|closed_time    |inprogress_time|status   |
+---+-----+-------------+--------------+---------------+---------------+---------+
|33 |TRUCK|LIBERATE     |null          |5/10/2021 15:08|4/10/2021 15:08|Closed   |
|11 |TRUCK|LIBERATE     |null          |8/9/2021 19:21 |1/9/2021 12:00 |Closed   |
|22 |TRUCK|LIBERATE     |3/9/2021 15:08|null           |null           |Cancelled|
+---+-----+-------------+--------------+---------------+---------------+---------+