如何在pyspark pivot之后获取所需的数据框?

How to get required dataframe after pyspark pivot?

我有一个 spark 数据框:

+------+----------------+-------+------+---------+
|name  |           pDate| status|user  |userCount|
+------+----------------+-------+------+---------+
|janani|2022-03-04      |active |qwe123|1        |
|raj   |2022-03-03      |active |qwe123|1        |
|ram   |2022-03-01      |active |qwe123|1        |
|ram   |2022-03-02      |active |qwe123|1        |
+------+----------------+-------+------+---------+

当我使用以下代码旋转数据框时

pvtcountuser = countuser.groupby('pDate','name').pivot('pDate').max('userCount').fillna(0)

我得到:

+----------------+------+----------+----------+----------+----------+
|pDate           |name  |2022-03-01|2022-03-02|2022-03-03|2022-03-04|
+----------------+------+----------+----------+----------+----------+
|2022-03-04      |janani|0         |0         |0         |1         |
|2022-03-03      |raj   |0         |0         |1         |0         |
|2022-03-01      |ram   |1         |0         |0         |0         |
|2022-03-02      |ram   |0         |1         |0         |0         |
+----------------+------+----------+----------+----------+----------+

但所需的解决方案是:

+----------------+------+----------+----------+----------+----------+
|pDate           |name  |2022-03-01|2022-03-02|2022-03-03|2022-03-04|
+----------------+------+----------+----------+----------+----------+
|2022-03-04      |janani|0         |0         |0         |1         |
|2022-03-03      |raj   |0         |0         |1         |0         |
|2022-03-01      |ram   |1         |1         |0         |0         |
+----------------+------+----------+----------+----------+----------+

请帮忙解决这个问题

每个名称的

min(pDate),可以在枢轴之前使用 windows 函数实现。
拥有它后,您可以按名称和 min_pDate.

进行聚合
import pyspark.sql.functions as F
from pyspark.sql.window import Window

sql_stmt = '''
select *
from   values    ('janani',date '2022-03-04','active','qwe123',1)
                ,('raj'   ,date '2022-03-03','active','qwe123',1)
                ,('ram'   ,date '2022-03-01','active','qwe123',1)
                ,('ram'   ,date '2022-03-02','active','qwe123',1) as t (name,pDate,status,user,userCount)
'''

countuser = spark.sql(sql_stmt)

pvtcountuser = (countuser
                .withColumn('min_pDate', F.min('pDate').over(Window.partitionBy('name')))
                .groupby('name', 'min_pDate')
                .pivot('pDate')
                .max('userCount')
                .fillna(0)
                .withColumnRenamed('min_pDate','pDate')
               )

pvtcountuser.show()

+------+----------+----------+----------+----------+----------+
|  name|     pDate|2022-03-01|2022-03-02|2022-03-03|2022-03-04|
+------+----------+----------+----------+----------+----------+
|janani|2022-03-04|         0|         0|         0|         1|
|   raj|2022-03-03|         0|         0|         1|         0|
|   ram|2022-03-01|         1|         1|         0|         0|
+------+----------+----------+----------+----------+----------+