pyspark - 将非空列分配给新列

pyspark - assign non-null columns to new columns

我在 pyspark 中有以下方案的数据框:

  user_id  datadate       page_1.A   page_1.B  page_1.C  page_2.A  page_2.B  \
0      111  20220203         NaN       NaN      NaN      NaN       NaN   
1      222  20220203         5         5         5       5.0       5.0   
2      333  20220203         3         3         3       3.0       3.0   

     page_2.C  page_3.A  page_3.B  page_3.C  
0       NaN       1.0       1.0       2.0  
1       5.0       NaN       NaN       NaN  
2       4.0       NaN       NaN       NaN   

所以它包含像 user_id 这样的列、datadate 和每个页面的几列(有 3 页),这是 2 次连接的结果。在这个例子中,我有 page_1、page_2、page_3,每个都有 3 列:A、B、C。此外,对于每一页的列,对于每一行,它们要么全部为空,要么全部为满,就像在我的示例中一样。 我不关心每页每一列的值,我只想为每一行获取不为空的 [A,B,C] 值。

想要的结果示例 table:

  user_id  datadate  A  B  C
0      111  20220203  1  1  2
1      222  20220203  5  5  5
2      333  20220203  3  3  3

所以逻辑将是这样的:

df[A] = page_1.A or page_2.A or page_3.A, whichever is not null
df[B] = page_1.B or page_2.B or page_3.B, whichever is not null
df[C] = page_1.C or page_2.C or page_3.C, whichever is not null

对于所有行.. 当然,我想以一种有效的方式来做。 非常感谢。

您可以使用 sql 函数 greatest 提取列列表中的最大值。 您可以在此处找到文档:https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.greatest.html

from pyspark.sql import functions as F
(df.withColumn('A', F.greates(F.col('page_1.A'), F.col('page_2.A), F.col('page_3.A'))
   .withColumn('B', F.greates(F.col('page_1.B'), F.col('page_2.B), F.col('page_3.B'))
   .select('userid', 'datadate', 'A', 'B'))