在 groupby 时,为 Spark 数据帧中多个名称的特定时间分辨率构建事件计数的最佳方法是什么?

What is the best way to build event counts for certain time resolution over multiple names in Spark dataframe while groupby?

假设我有以下 Spark 框架:

+-------------------+--------+
|timestamp          |UserName|
+-------------------+--------+
|2021-08-11 04:05:06|A       |
|2021-08-11 04:15:06|B       |
|2021-08-11 09:15:26|A       |
|2021-08-11 11:04:06|B       |
|2021-08-11 14:55:16|A       |
|2021-08-13 04:12:11|B       |
+-------------------+--------+

我想根据每个用户的事件计数.

构建所需时间分辨率的时间序列数据

假设 24 小时(每天)的时间范围内,groupBy 后的预期结果应如下所示:

+------------------------------------------+-------------+-------------+
|window_frame_24_Hours                     | username A  | username B  |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3            |2            |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0            |1            |
+------------------------------------------+-------------+-------------+

Edit1: 如果是 12 小时 frame\resolution:

+------------------------------------------+-------------+-------------+
|window_frame_12_Hours                     | username A  | username B  |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2            |2            |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1            |0            |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0            |0            |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0            |1            |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0            |0            |
+------------------------------------------+-------------+-------------+

按时间分组 window '1 day' + UserName 进行计数,然后按 window 帧和主用户名分组:

from pyspark.sql import functions as F

result = df.groupBy(
    F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
    "UserName"
).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
   F.first("count")
).na.fill(0)

result.show(truncate=False)

#+------------------------------------------+---+---+
#|window_frame_24_Hours                     |A  |B  |
#+------------------------------------------+---+---+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0  |1  |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3  |2  |
#+------------------------------------------+---+---+

如果您需要缺失的日期,则必须使用 sequence 最小和最大 timestamp 生成所有日期,然后加入原始数据框:

intervals_df = df.withColumn(
    "timestamp",
    F.date_trunc("day", "timestamp")
).selectExpr(
    "sequence(min(timestamp), max(timestamp + interval 1 day), interval 1 day) as dates"
).select(
    F.explode(
        F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
    ).alias("frame")
).filter("frame is not null").crossJoin(
    df.select("UserName").distinct()
)

result = intervals_df.alias("a").join(
    df.alias("b"),
    F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
    & (F.col("a.UserName") == F.col("b.UserName")),
    "left"
).groupBy(
    F.col("frame").alias("window_frame_24_Hours")
).pivot("a.UserName").agg(
    F.count("b.UserName")
)

result.show(truncate=False)

#+------------------------------------------+----------+----------+
#|window_frame_24_Hours                     |username_A|username_B|
#+------------------------------------------+----------+----------+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0         |1         |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3         |2         |
#|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0         |0         |
#+------------------------------------------+----------+----------+