Spark 旋转一列,但保持其他列不变
Spark pivot one column but keep others intact
给定以下数据框,我如何旋转最大分数但汇总播放总和?
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql import Window
df = sqlContext.createDataFrame([
("u1", "g1", 10, 0, 1),
("u1", "g3", 2, 2, 1),
("u1", "g3", 5, 3, 1),
("u1", "g4", 5, 4, 1),
("u2", "g2", 1, 1, 1),
], ["UserID", "GameID", "Score", "Time", "Plays"])
期望输出
+------+-------------+-------------+-----+
|UserID|MaxScoreGame1|MaxScoreGame2|Plays|
+------+-------------+-------------+-----+
| u1| 10| 5| 4|
| u2| 1| null| 1|
+------+-------------+-------------+-----+
我在下面发布了一个解决方案,但我希望避免使用 join。
这是一个使用 join 的解决方案,我希望避免这种情况:
汇总数据帧
df_sum = df.groupBy("UserID").agg(F.sum("Plays").alias("Plays")).alias("df_sum")
df_sum.show()
+------+-----+
|UserID|Plays|
+------+-----+
| u1| 4|
| u2| 1|
+------+-----+
rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time"))
透视数据框
rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time"))
df_piv = (df
.groupBy("UserID", "GameID")
.agg(F.sum("Plays").alias("Plays"),
F.max("Score").alias("Score"),
F.min("Time").alias("Time"))
.select(F.col("*"),
F.row_number().over(rowNumberWindow).alias("GameNumber"))
.filter(F.col("GameNumber") <= F.lit(2))
.withColumn("GameCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber")))
.groupBy("UserID")
.pivot("GameCol", ["MaxScoreGame1", "MaxScoreGame2"])
.agg(F.max("Score"))
).alias("df_piv")
df_piv.show()
+------+-------------+-------------+
|UserID|MaxScoreGame1|MaxScoreGame2|
+------+-------------+-------------+
| u1| 10| 5|
| u2| 1| null|
+------+-------------+-------------+
加入数据框
df_joined = df_sum.join(df_piv, F.col("df_sum.UserID") == F.col("df_piv.UserID"))
df_joined.show()
+------+-----+------+-------------+-------------+
|UserID|Plays|UserID|MaxScoreGame1|MaxScoreGame2|
+------+-----+------+-------------+-------------+
| u1| 4| u1| 10| 5|
| u2| 1| u2| 1| null|
+------+-----+------+-------------+-------------+
我认为这不是真正的改进,但您可以添加总播放次数
...
.select(
F.col("*"),
F.row_number().over(rowNumberWindow).alias("GameNumber"),
F.sum("Plays").over(rowNumberWindow.orderBy()).alias("total_plays")
)
...
并稍后将其用作 pivot
:
的辅助分组列
...
.groupBy("UserID", "total_plays")
.pivot("GameCol", ["MaxScoreGame1", "MaxScoreGame2"])
.agg(F.max("Score"))
...
给定以下数据框,我如何旋转最大分数但汇总播放总和?
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql import Window
df = sqlContext.createDataFrame([
("u1", "g1", 10, 0, 1),
("u1", "g3", 2, 2, 1),
("u1", "g3", 5, 3, 1),
("u1", "g4", 5, 4, 1),
("u2", "g2", 1, 1, 1),
], ["UserID", "GameID", "Score", "Time", "Plays"])
期望输出
+------+-------------+-------------+-----+
|UserID|MaxScoreGame1|MaxScoreGame2|Plays|
+------+-------------+-------------+-----+
| u1| 10| 5| 4|
| u2| 1| null| 1|
+------+-------------+-------------+-----+
我在下面发布了一个解决方案,但我希望避免使用 join。
这是一个使用 join 的解决方案,我希望避免这种情况:
汇总数据帧
df_sum = df.groupBy("UserID").agg(F.sum("Plays").alias("Plays")).alias("df_sum")
df_sum.show()
+------+-----+
|UserID|Plays|
+------+-----+
| u1| 4|
| u2| 1|
+------+-----+
rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time"))
透视数据框
rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time"))
df_piv = (df
.groupBy("UserID", "GameID")
.agg(F.sum("Plays").alias("Plays"),
F.max("Score").alias("Score"),
F.min("Time").alias("Time"))
.select(F.col("*"),
F.row_number().over(rowNumberWindow).alias("GameNumber"))
.filter(F.col("GameNumber") <= F.lit(2))
.withColumn("GameCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber")))
.groupBy("UserID")
.pivot("GameCol", ["MaxScoreGame1", "MaxScoreGame2"])
.agg(F.max("Score"))
).alias("df_piv")
df_piv.show()
+------+-------------+-------------+
|UserID|MaxScoreGame1|MaxScoreGame2|
+------+-------------+-------------+
| u1| 10| 5|
| u2| 1| null|
+------+-------------+-------------+
加入数据框
df_joined = df_sum.join(df_piv, F.col("df_sum.UserID") == F.col("df_piv.UserID"))
df_joined.show()
+------+-----+------+-------------+-------------+
|UserID|Plays|UserID|MaxScoreGame1|MaxScoreGame2|
+------+-----+------+-------------+-------------+
| u1| 4| u1| 10| 5|
| u2| 1| u2| 1| null|
+------+-----+------+-------------+-------------+
我认为这不是真正的改进,但您可以添加总播放次数
...
.select(
F.col("*"),
F.row_number().over(rowNumberWindow).alias("GameNumber"),
F.sum("Plays").over(rowNumberWindow.orderBy()).alias("total_plays")
)
...
并稍后将其用作 pivot
:
...
.groupBy("UserID", "total_plays")
.pivot("GameCol", ["MaxScoreGame1", "MaxScoreGame2"])
.agg(F.max("Score"))
...