在 groupby 时,为 Spark 数据帧中多个名称的特定时间分辨率构建事件计数的最佳方法是什么?
What is the best way to build event counts for certain time resolution over multiple names in Spark dataframe while groupby?
假设我有以下 Spark 框架:
+-------------------+--------+
|timestamp |UserName|
+-------------------+--------+
|2021-08-11 04:05:06|A |
|2021-08-11 04:15:06|B |
|2021-08-11 09:15:26|A |
|2021-08-11 11:04:06|B |
|2021-08-11 14:55:16|A |
|2021-08-13 04:12:11|B |
+-------------------+--------+
我想根据每个用户的事件计数.
构建所需时间分辨率的时间序列数据
- 注意 1:在
UserName
分组并根据所需时间 frame\resolution 进行计数后,时间框架需要与火花框架保持一致。 (也许使用 Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming )
- 注2:需要在特定时间范围内填补缺失的空白,如果没有事件则替换0。
- 注意 3:我对使用
UDF
或通过 toPandas()
破解它不感兴趣。
假设 24 小时(每天)的时间范围内,groupBy 后的预期结果应如下所示:
+------------------------------------------+-------------+-------------+
|window_frame_24_Hours | username A | username B |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3 |2 |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0 |0 |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0 |1 |
+------------------------------------------+-------------+-------------+
Edit1: 如果是 12 小时 frame\resolution:
+------------------------------------------+-------------+-------------+
|window_frame_12_Hours | username A | username B |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2 |2 |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1 |0 |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0 |0 |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0 |0 |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0 |1 |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0 |0 |
+------------------------------------------+-------------+-------------+
按时间分组 window '1 day'
+ UserName
进行计数,然后按 window 帧和主用户名分组:
from pyspark.sql import functions as F
result = df.groupBy(
F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
"UserName"
).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
F.first("count")
).na.fill(0)
result.show(truncate=False)
#+------------------------------------------+---+---+
#|window_frame_24_Hours |A |B |
#+------------------------------------------+---+---+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0 |1 |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3 |2 |
#+------------------------------------------+---+---+
如果您需要缺失的日期,则必须使用 sequence
最小和最大 timestamp
生成所有日期,然后加入原始数据框:
intervals_df = df.withColumn(
"timestamp",
F.date_trunc("day", "timestamp")
).selectExpr(
"sequence(min(timestamp), max(timestamp + interval 1 day), interval 1 day) as dates"
).select(
F.explode(
F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
).alias("frame")
).filter("frame is not null").crossJoin(
df.select("UserName").distinct()
)
result = intervals_df.alias("a").join(
df.alias("b"),
F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
& (F.col("a.UserName") == F.col("b.UserName")),
"left"
).groupBy(
F.col("frame").alias("window_frame_24_Hours")
).pivot("a.UserName").agg(
F.count("b.UserName")
)
result.show(truncate=False)
#+------------------------------------------+----------+----------+
#|window_frame_24_Hours |username_A|username_B|
#+------------------------------------------+----------+----------+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0 |1 |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3 |2 |
#|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0 |0 |
#+------------------------------------------+----------+----------+
假设我有以下 Spark 框架:
+-------------------+--------+
|timestamp |UserName|
+-------------------+--------+
|2021-08-11 04:05:06|A |
|2021-08-11 04:15:06|B |
|2021-08-11 09:15:26|A |
|2021-08-11 11:04:06|B |
|2021-08-11 14:55:16|A |
|2021-08-13 04:12:11|B |
+-------------------+--------+
我想根据每个用户的事件计数.
构建所需时间分辨率的时间序列数据- 注意 1:在
UserName
分组并根据所需时间 frame\resolution 进行计数后,时间框架需要与火花框架保持一致。 (也许使用 Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming ) - 注2:需要在特定时间范围内填补缺失的空白,如果没有事件则替换0。
- 注意 3:我对使用
UDF
或通过toPandas()
破解它不感兴趣。
假设 24 小时(每天)的时间范围内,groupBy 后的预期结果应如下所示:
+------------------------------------------+-------------+-------------+
|window_frame_24_Hours | username A | username B |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3 |2 |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0 |0 |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0 |1 |
+------------------------------------------+-------------+-------------+
Edit1: 如果是 12 小时 frame\resolution:
+------------------------------------------+-------------+-------------+
|window_frame_12_Hours | username A | username B |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2 |2 |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1 |0 |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0 |0 |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0 |0 |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0 |1 |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0 |0 |
+------------------------------------------+-------------+-------------+
按时间分组 window '1 day'
+ UserName
进行计数,然后按 window 帧和主用户名分组:
from pyspark.sql import functions as F
result = df.groupBy(
F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
"UserName"
).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
F.first("count")
).na.fill(0)
result.show(truncate=False)
#+------------------------------------------+---+---+
#|window_frame_24_Hours |A |B |
#+------------------------------------------+---+---+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0 |1 |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3 |2 |
#+------------------------------------------+---+---+
如果您需要缺失的日期,则必须使用 sequence
最小和最大 timestamp
生成所有日期,然后加入原始数据框:
intervals_df = df.withColumn(
"timestamp",
F.date_trunc("day", "timestamp")
).selectExpr(
"sequence(min(timestamp), max(timestamp + interval 1 day), interval 1 day) as dates"
).select(
F.explode(
F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
).alias("frame")
).filter("frame is not null").crossJoin(
df.select("UserName").distinct()
)
result = intervals_df.alias("a").join(
df.alias("b"),
F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
& (F.col("a.UserName") == F.col("b.UserName")),
"left"
).groupBy(
F.col("frame").alias("window_frame_24_Hours")
).pivot("a.UserName").agg(
F.count("b.UserName")
)
result.show(truncate=False)
#+------------------------------------------+----------+----------+
#|window_frame_24_Hours |username_A|username_B|
#+------------------------------------------+----------+----------+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0 |1 |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3 |2 |
#|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0 |0 |
#+------------------------------------------+----------+----------+