从 Pyspark 中另一列的值构建一列

Build a column from value of another column in Pyspark

我有一个 table 如下。

+----------+---------------------+-----------------+----------+--------------------------+--------------------------+-----------------+-----------------------+------------------+------------------------+
|cust_pr_id|cust_pr_name         |now_prcs_status  |pr_join_dt|installation_due          |installation_completed    |seg_purchase_due |seg_purchase_completed |wire_in_line_due  |wire_in_line_completed  |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+-----------------+-----------------------+------------------+------------------------+
|9822647220|Jonathan RM Berlin   |installation     |20200202  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|7166582305|Paola RM Berlin      |seg purchase     |20200903  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|9964201263|Roy RM Poland        |installation     |20201023  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|7288402221|Katerina RM Mia      |wire in line     |20201110  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|1562745600000    |1562761216526          |                  |                        |
|8424182826|Smidge RM Siberia    |seg purchase     |20200902  |2019-07-15 08:00:00.000000|2019-07-10 09:11:30.599000|                 |                       |                  |                        |
|4445859610|Donna RM Brazil      |seg purchase     |20200903  |2019-07-15 08:00:00.000000|2019-07-10 09:11:30.599000|                 |                       |                  |                        |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+-----------------+-----------------------+------------------+------------------------+

我想根据这些数据构建如下数据集。 在这里,如果字段“now_prcs_status”的值为“安装”,那么我需要将“installation_due”的值生成为“curr_prcs_due”,并将“installation_completed”的值生成“作为“curr_prcs_completed”。 同样,如果“now_prcs_status”的值是“seg purchase”,我需要将“seg_purchase_due”的值生成为“curr_prcs_due”和“seg_purchase_completed”的值作为“curr_prcs_completed”。 当“cust_pr_name”的值为“线路”时相同,我需要将其到期值和完成值分别填充为“curr_prcs_due”和“seg_purchase_completed”。

+----------+---------------------+-----------------+----------+--------------------------+--------------------------+
|cust_pr_id|cust_pr_name         |now_prcs_status  |pr_join_dt|curr_prcs_due             |curr_prcs_completed       |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+
|9822647220|Jonathan RM Berlin   |installation     |20200202  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7166582305|Paola RM Berlin      |seg purchase     |20200903  |1562745600000             |1562761216526             |
|9964201263|Roy RM Poland        |installation     |20201023  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7288402221|Katerina RM Mia      |wire in line     |20201110  |                          |                          |
|8424182826|Smidge RM Siberia    |seg purchase     |20200902  |                          |                          |
|4445859610|Donna RM Brazil      |seg purchase     |20200903  |                          |                          |
+----------+---------------------+-----------------+----------+--------------------------+--------------------------+

以上为预期值

我不想使用 sql 案例语句,因为在我的实际数据集中,cust_pr_name 总共有 105 个不同的值,我不想结束编写 105 个案例陈述。

有人可以帮助我通过 pyspark 或 hive 实现这一目标的方法吗..

谢谢!

一些列表理解应该可以完成这项工作:

import pyspark.sql.functions as F

prcs = [c[:-4] for c in df.columns[4::2]]

df2 = df.select(
    *df.columns[:4], 
    F.coalesce(*[
        F.when(
            F.col('now_prcs_status') == p.replace('_', ' '), 
            F.col(p + '_due')
        ) 
        for p in prcs
    ]).alias('curr_prcs_due'), 
    F.coalesce(*[
        F.when(
            F.col('now_prcs_status') == p.replace('_', ' '), 
            F.col(p + '_completed')
        ) 
        for p in prcs
    ]).alias('curr_prcs_completed')
)

df2.show(truncate=False)
+----------+------------------+---------------+----------+--------------------------+--------------------------+
|cust_pr_id|cust_pr_name      |now_prcs_status|pr_join_dt|curr_prcs_due             |curr_prcs_completed       |
+----------+------------------+---------------+----------+--------------------------+--------------------------+
|9822647220|Jonathan RM Berlin|installation   |20200202  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7166582305|Paola RM Berlin   |seg purchase   |20200903  |1562745600000             |1562761216526             |
|9964201263|Roy RM Poland     |installation   |20201023  |2019-07-11 08:00:00.000000|2019-07-10 12:20:30.132000|
|7288402221|Katerina RM Mia   |wire in line   |20201110  |null                      |null                      |
|8424182826|Smidge RM Siberia |seg purchase   |20200902  |null                      |null                      |
|4445859610|Donna RM Brazil   |seg purchase   |20200903  |null                      |null                      |
+----------+------------------+---------------+----------+--------------------------+--------------------------+