从后续项目聚合第一个分组项目

Aggregate First Grouped Item from Subsequent Items

我的用户游戏会话包含:用户 ID、游戏 ID、得分和玩游戏时的时间戳。

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("u1", "g1", 10, 0),
    ("u1", "g3", 2, 2),
    ("u1", "g3", 5, 3),
    ("u1", "g4", 5, 4),
    ("u2", "g2", 1, 1),
], ["UserID", "GameID", "Score", "Time"])

期望输出

+------+-------------+-------------+
|UserID|MaxScoreGame1|MaxScoreGame2|
+------+-------------+-------------+
|    u1|           10|            5|
|    u2|            1|         null|
+------+-------------+-------------+

我想转换数据,以便获得用户玩的第一场比赛的最高分以及第二场比赛的最高分(如果我还可以获得所有后续比赛的最高分,则奖励) .不幸的是,我不确定如何使用 Spark SQL。

我知道我可以按 UserID、GameID 分组,然后聚合以获得最高分数和最短时间。不确定如何从那里继续。

澄清:注意MaxScoreGame1和MaxScoreGame2指的是第一和第二游戏用户玩家;不是游戏ID。

您可以尝试结合使用 Window 函数和 Pivot。

  1. 获取按时间排序的用户 ID 分区的每个游戏的行号。
  2. 过滤到 GameNumber 为 1 或 2。
  3. 以此为轴以获得所需的输出形状。

不幸的是,我使用的不是 python 的 scala,但下面的内容应该很容易转移到 python 库。

import org.apache.spark.sql.expressions.Window

// Use a window function to get row number
val rowNumberWindow = Window.partitionBy(col("UserId")).orderBy(col("Time"))  

val output = {
  df
    .select(
      col("*"),
      row_number().over(rowNumberWindow).alias("GameNumber")
    )
    .filter(col("GameNumber") <= lit(2))
    .groupBy(col("UserId"))
    .pivot("GameNumber")
    .agg(
      sum(col("Score"))
    )
}

output.show()

+------+---+----+
|UserId|  1|   2|
+------+---+----+
|    u1| 10|   2|
|    u2|  1|null|
+------+---+----+

PySpark 解决方案:

from pyspark.sql import Window

rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time"))

(df
 .groupBy("UserID", "GameID")
 .agg(F.max("Score").alias("Score"),
      F.min("Time").alias("Time"))
 .select(F.col("*"),
         F.row_number().over(rowNumberWindow).alias("GameNumber"))
 .filter(F.col("GameNumber") <= F.lit(2))
 .withColumn("GameMaxScoreCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber")))
 .groupBy("UserID")
 .pivot("GameMaxScoreCol")  
 .agg(F.max("Score"))
).show()

+------+-------------+-------------+
|UserID|MaxScoreGame1|MaxScoreGame2|
+------+-------------+-------------+
|    u1|           10|            5|
|    u2|            1|         null|
+------+-------------+-------------+