PySpark 在枢轴后连接列

PySpark join columns after pivot

对于以下示例 DataFrame:

df = spark.createDataFrame(
[
    ('2017-01-01', 'A', 1),
    ('2017-01-01', 'B', 2),
    ('2017-01-01', 'C', 3),
    ('2017-01-02', 'A', 4),
    ('2017-01-02', 'B', 5),
    ('2017-01-02', 'C', 6),
    ('2017-01-03', 'A', 7),
    ('2017-01-03', 'B', 8),
    ('2017-01-03', 'C', 9),
],
('date', 'type', 'value')
)

我想将其转换为所有列都等于 "types"(A、B 和 C)。

目前,我发现这段代码最接近我想要实现的目标:

 df.groupby("date", "type").pivot("type").sum().orderBy("date").show()

+----------+----+----+----+----+
|      date|type|   A|   B|   C|
+----------+----+----+----+----+
|2017-01-01|   C|null|null|   3|
|2017-01-01|   A|   1|null|null|
|2017-01-01|   B|null|   2|null|
|2017-01-02|   B|null|   5|null|
|2017-01-02|   C|null|null|   6|
|2017-01-02|   A|   4|null|null|
|2017-01-03|   A|   7|null|null|
|2017-01-03|   C|null|null|   9|
|2017-01-03|   B|null|   8|null|
+----------+----+----+----+----+

问题是我还有太多行(包含所有 "null")。

我想得到的是:

+----------+---+---+---+
|      date|  A|  B|  C|
+----------+---+---+---+
|2017-01-01|  1|  2|  3|
|2017-01-02|  4|  5|  6|
|2017-01-03|  7|  8|  9|
+----------+---+---+---+

Aka,我想要与 pandas.DataFrame.unstack() 具有相似功能的东西。

如果有人对我如何在 PySpark 中实现这一目标有任何提示,那就太好了。

您需要按 "date" 列再做一组,然后 select max 来自 A,B,C.

的值

Example:

df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(max(col("A")).alias("A"),max(col("B")).

#+----------+---+---+---+
#|      date|  A|  B|  c|
#+----------+---+---+---+
#|2017-01-01|  1|  2|  3|
#|2017-01-02|  4|  5|  6|
#|2017-01-03|  7|  8|  9|
#+----------+---+---+---+

# dynamic way 
aggregate = ["A","B","C"]
funs=[max]
exprs=[f(col(c)).alias(c) for f in funs for c in aggregate]
df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(*exprs).show()

#+----------+---+---+---+
#|      date|  A|  B|  c|
#+----------+---+---+---+
#|2017-01-01|  1|  2|  3|
#|2017-01-02|  4|  5|  6|
#|2017-01-03|  7|  8|  9|
#+----------+---+---+---+