PySpark 在枢轴后连接列
PySpark join columns after pivot
对于以下示例 DataFrame:
df = spark.createDataFrame(
[
('2017-01-01', 'A', 1),
('2017-01-01', 'B', 2),
('2017-01-01', 'C', 3),
('2017-01-02', 'A', 4),
('2017-01-02', 'B', 5),
('2017-01-02', 'C', 6),
('2017-01-03', 'A', 7),
('2017-01-03', 'B', 8),
('2017-01-03', 'C', 9),
],
('date', 'type', 'value')
)
我想将其转换为所有列都等于 "types"(A、B 和 C)。
目前,我发现这段代码最接近我想要实现的目标:
df.groupby("date", "type").pivot("type").sum().orderBy("date").show()
+----------+----+----+----+----+
| date|type| A| B| C|
+----------+----+----+----+----+
|2017-01-01| C|null|null| 3|
|2017-01-01| A| 1|null|null|
|2017-01-01| B|null| 2|null|
|2017-01-02| B|null| 5|null|
|2017-01-02| C|null|null| 6|
|2017-01-02| A| 4|null|null|
|2017-01-03| A| 7|null|null|
|2017-01-03| C|null|null| 9|
|2017-01-03| B|null| 8|null|
+----------+----+----+----+----+
问题是我还有太多行(包含所有 "null")。
我想得到的是:
+----------+---+---+---+
| date| A| B| C|
+----------+---+---+---+
|2017-01-01| 1| 2| 3|
|2017-01-02| 4| 5| 6|
|2017-01-03| 7| 8| 9|
+----------+---+---+---+
Aka,我想要与 pandas.DataFrame.unstack() 具有相似功能的东西。
如果有人对我如何在 PySpark 中实现这一目标有任何提示,那就太好了。
您需要按 "date"
列再做一组,然后 select max
来自 A,B,C
.
的值
Example:
df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(max(col("A")).alias("A"),max(col("B")).
#+----------+---+---+---+
#| date| A| B| c|
#+----------+---+---+---+
#|2017-01-01| 1| 2| 3|
#|2017-01-02| 4| 5| 6|
#|2017-01-03| 7| 8| 9|
#+----------+---+---+---+
# dynamic way
aggregate = ["A","B","C"]
funs=[max]
exprs=[f(col(c)).alias(c) for f in funs for c in aggregate]
df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(*exprs).show()
#+----------+---+---+---+
#| date| A| B| c|
#+----------+---+---+---+
#|2017-01-01| 1| 2| 3|
#|2017-01-02| 4| 5| 6|
#|2017-01-03| 7| 8| 9|
#+----------+---+---+---+
对于以下示例 DataFrame:
df = spark.createDataFrame(
[
('2017-01-01', 'A', 1),
('2017-01-01', 'B', 2),
('2017-01-01', 'C', 3),
('2017-01-02', 'A', 4),
('2017-01-02', 'B', 5),
('2017-01-02', 'C', 6),
('2017-01-03', 'A', 7),
('2017-01-03', 'B', 8),
('2017-01-03', 'C', 9),
],
('date', 'type', 'value')
)
我想将其转换为所有列都等于 "types"(A、B 和 C)。
目前,我发现这段代码最接近我想要实现的目标:
df.groupby("date", "type").pivot("type").sum().orderBy("date").show()
+----------+----+----+----+----+
| date|type| A| B| C|
+----------+----+----+----+----+
|2017-01-01| C|null|null| 3|
|2017-01-01| A| 1|null|null|
|2017-01-01| B|null| 2|null|
|2017-01-02| B|null| 5|null|
|2017-01-02| C|null|null| 6|
|2017-01-02| A| 4|null|null|
|2017-01-03| A| 7|null|null|
|2017-01-03| C|null|null| 9|
|2017-01-03| B|null| 8|null|
+----------+----+----+----+----+
问题是我还有太多行(包含所有 "null")。
我想得到的是:
+----------+---+---+---+
| date| A| B| C|
+----------+---+---+---+
|2017-01-01| 1| 2| 3|
|2017-01-02| 4| 5| 6|
|2017-01-03| 7| 8| 9|
+----------+---+---+---+
Aka,我想要与 pandas.DataFrame.unstack() 具有相似功能的东西。
如果有人对我如何在 PySpark 中实现这一目标有任何提示,那就太好了。
您需要按 "date"
列再做一组,然后 select max
来自 A,B,C
.
Example:
df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(max(col("A")).alias("A"),max(col("B")).
#+----------+---+---+---+
#| date| A| B| c|
#+----------+---+---+---+
#|2017-01-01| 1| 2| 3|
#|2017-01-02| 4| 5| 6|
#|2017-01-03| 7| 8| 9|
#+----------+---+---+---+
# dynamic way
aggregate = ["A","B","C"]
funs=[max]
exprs=[f(col(c)).alias(c) for f in funs for c in aggregate]
df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(*exprs).show()
#+----------+---+---+---+
#| date| A| B| c|
#+----------+---+---+---+
#|2017-01-01| 1| 2| 3|
#|2017-01-02| 4| 5| 6|
#|2017-01-03| 7| 8| 9|
#+----------+---+---+---+