如何使用 window 而不是 Pyspark groupBy 进行聚合
How to aggregate using window instead of Pyspark groupBy
我在使用 window 函数而不是 GroupBy 来聚合每个用户时遇到问题,在我的例子中 110 和 220 用户 ID:
1- 计算每个 p_uuid
的行数
2- 为每个 p_uuid
创建带有最小和最大时间戳的新列
df = spark.createDataFrame([(1, 110, 'aaa', 'walk', 'work', '2019-09-28 13:40:19-04:00'),
(2, 110, 'aaa', 'walk', 'work', '2019-09-28 13:40:19-04:01'),
(3, 110, 'aaa', 'walk', 'work', '2019-09-28 13:40:19-04:02'),
(4, 110, 'aaa', 'metro', 'work', '2019-09-28 13:41:19-04:00'),
(5, 110, 'aaa', 'metro', 'work', '2019-09-28 13:41:19-04:01'),
(6, 110, 'aaa', 'walk', 'work', '2019-09-28 13:42:19-04:00'),
(7, 110, 'aaa', 'walk', 'work', '2019-09-28 13:42:19-04:01'),
(8, 110, 'bbb', 'bike', 'home', '2019-09-17 14:40:19-04:00'),
(9, 110, 'bbb', 'bus', 'home', '2019-09-17 14:41:19-04:00'),
(10, 110, 'bbb', 'walk', 'home', '2019-09-17 14:43:19-04:00'),
(16, 110, 'ooo', None, None, '2019-08-29 16:01:19-04:00'),
(17, 110, 'ooo', None, None, '2019-08-29 16:02:19-04:00'),
(18, 110, 'ooo', None, None, '2019-08-29 16:02:19-04:00'),
(19, 222, 'www', 'car', 'work', '2019-09-28 08:00:19-04:00'),
(20, 222, 'www', 'metro', 'work', '2019-09-28 08:01:19-04:00'),
(21, 222, 'www', 'walk', 'work', '2019-09-28 08:02:19-04:00'),
(22, 222, 'xxx', 'walk', 'friend', '2019-09-17 08:40:19-04:00'),
(23, 222, 'xxx', 'bike', 'friend', '2019-09-17 08:42:19-04:00'),
(24, 222, 'xxx', 'bus', 'friend', '2019-09-17 08:43:19-04:00'),
(30, 222, 'ooo', None, None, '2019-08-29 10:00:19-04:00'),
(31, 222, 'ooo', None, None, '2019-08-29 10:01:19-04:00'),
(32, 222, 'ooo', None, None, '2019-08-29 10:02:19-04:00')],
['idx', 'u_uuid', 'p_uuid', 'mode', 'place', 'timestamp']
)
df.show(30, False)
我用过
win = Window.partitionBy("u_uuid", "p_uuid").orderBy("timestamp")
df.withColumn("count_", F.count('p_uuid').over(win))
df.withColumn("max_timestamp", F.max("timestamp").over(win))
df.withColumn("min_timestamp", F.min("timestamp").over(win))
它似乎不起作用(例如:获取 max_)
remarque:忘记 trip_id
、subtrip_id
和 track_id
列
您需要使用 unboundedPreceding,unboundedFollowing
和 .partitionBy
子句,如果我们提供 orderBy clause,则默认值为 unboundedPreceding,currentRow
。
在您的 window 规范中添加 .rowsBetween
,然后再次添加 运行。
win = Window.partitionBy("u_uuid", "p_uuid").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Example:
df.withColumn("max_timestamp", max("timestamp").over(win)).show(10,False)
+---+------+------+-----+------+-------------------------+-------------------------+
|idx|u_uuid|p_uuid|mode |place |timestamp |max_timestamp |
+---+------+------+-----+------+-------------------------+-------------------------+
|8 |110 |bbb |bike |home |2019-09-17 14:40:19-04:00|2019-09-17 14:43:19-04:00|
|9 |110 |bbb |bus |home |2019-09-17 14:41:19-04:00|2019-09-17 14:43:19-04:00|
|10 |110 |bbb |walk |home |2019-09-17 14:43:19-04:00|2019-09-17 14:43:19-04:00|
|16 |110 |ooo |null |null |2019-08-29 16:01:19-04:00|2019-08-29 16:02:19-04:00|
|17 |110 |ooo |null |null |2019-08-29 16:02:19-04:00|2019-08-29 16:02:19-04:00|
|18 |110 |ooo |null |null |2019-08-29 16:02:19-04:00|2019-08-29 16:02:19-04:00|
+---+------+------+-----+------+-------------------------+-------------------------+
您必须使用 rowsBetween
将 window 扩展到整个框架:
win = Window.partitionBy("u_uuid", "p_uuid").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
我在使用 window 函数而不是 GroupBy 来聚合每个用户时遇到问题,在我的例子中 110 和 220 用户 ID:
1- 计算每个 p_uuid
2- 为每个 p_uuid
df = spark.createDataFrame([(1, 110, 'aaa', 'walk', 'work', '2019-09-28 13:40:19-04:00'),
(2, 110, 'aaa', 'walk', 'work', '2019-09-28 13:40:19-04:01'),
(3, 110, 'aaa', 'walk', 'work', '2019-09-28 13:40:19-04:02'),
(4, 110, 'aaa', 'metro', 'work', '2019-09-28 13:41:19-04:00'),
(5, 110, 'aaa', 'metro', 'work', '2019-09-28 13:41:19-04:01'),
(6, 110, 'aaa', 'walk', 'work', '2019-09-28 13:42:19-04:00'),
(7, 110, 'aaa', 'walk', 'work', '2019-09-28 13:42:19-04:01'),
(8, 110, 'bbb', 'bike', 'home', '2019-09-17 14:40:19-04:00'),
(9, 110, 'bbb', 'bus', 'home', '2019-09-17 14:41:19-04:00'),
(10, 110, 'bbb', 'walk', 'home', '2019-09-17 14:43:19-04:00'),
(16, 110, 'ooo', None, None, '2019-08-29 16:01:19-04:00'),
(17, 110, 'ooo', None, None, '2019-08-29 16:02:19-04:00'),
(18, 110, 'ooo', None, None, '2019-08-29 16:02:19-04:00'),
(19, 222, 'www', 'car', 'work', '2019-09-28 08:00:19-04:00'),
(20, 222, 'www', 'metro', 'work', '2019-09-28 08:01:19-04:00'),
(21, 222, 'www', 'walk', 'work', '2019-09-28 08:02:19-04:00'),
(22, 222, 'xxx', 'walk', 'friend', '2019-09-17 08:40:19-04:00'),
(23, 222, 'xxx', 'bike', 'friend', '2019-09-17 08:42:19-04:00'),
(24, 222, 'xxx', 'bus', 'friend', '2019-09-17 08:43:19-04:00'),
(30, 222, 'ooo', None, None, '2019-08-29 10:00:19-04:00'),
(31, 222, 'ooo', None, None, '2019-08-29 10:01:19-04:00'),
(32, 222, 'ooo', None, None, '2019-08-29 10:02:19-04:00')],
['idx', 'u_uuid', 'p_uuid', 'mode', 'place', 'timestamp']
)
df.show(30, False)
我用过
win = Window.partitionBy("u_uuid", "p_uuid").orderBy("timestamp")
df.withColumn("count_", F.count('p_uuid').over(win))
df.withColumn("max_timestamp", F.max("timestamp").over(win))
df.withColumn("min_timestamp", F.min("timestamp").over(win))
它似乎不起作用(例如:获取 max_)
remarque:忘记 trip_id
、subtrip_id
和 track_id
列
您需要使用 unboundedPreceding,unboundedFollowing
和 .partitionBy
子句,如果我们提供 orderBy clause,则默认值为 unboundedPreceding,currentRow
。
在您的 window 规范中添加 .rowsBetween
,然后再次添加 运行。
win = Window.partitionBy("u_uuid", "p_uuid").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Example:
df.withColumn("max_timestamp", max("timestamp").over(win)).show(10,False)
+---+------+------+-----+------+-------------------------+-------------------------+
|idx|u_uuid|p_uuid|mode |place |timestamp |max_timestamp |
+---+------+------+-----+------+-------------------------+-------------------------+
|8 |110 |bbb |bike |home |2019-09-17 14:40:19-04:00|2019-09-17 14:43:19-04:00|
|9 |110 |bbb |bus |home |2019-09-17 14:41:19-04:00|2019-09-17 14:43:19-04:00|
|10 |110 |bbb |walk |home |2019-09-17 14:43:19-04:00|2019-09-17 14:43:19-04:00|
|16 |110 |ooo |null |null |2019-08-29 16:01:19-04:00|2019-08-29 16:02:19-04:00|
|17 |110 |ooo |null |null |2019-08-29 16:02:19-04:00|2019-08-29 16:02:19-04:00|
|18 |110 |ooo |null |null |2019-08-29 16:02:19-04:00|2019-08-29 16:02:19-04:00|
+---+------+------+-----+------+-------------------------+-------------------------+
您必须使用 rowsBetween
将 window 扩展到整个框架:
win = Window.partitionBy("u_uuid", "p_uuid").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)