基于不同列的 PySpark 分组和聚合?
PySpark Grouping and Aggregating based on A Different Column?
我正在解决一个问题,我有一个以下格式的数据集(出于示例目的替换了真实数据):
session
activity
timestamp
1
enter_store
2022-03-01 23:25:11
1
pay_at_cashier
2022-03-01 23:31:10
1
exit_store
2022-03-01 23:55:01
2
enter_store
2022-03-02 07:15:00
2
pay_at_cashier
2022-03-02 07:24:00
2
exit_store
2022-03-02 07:35:55
3
enter_store
2022-03-05 11:07:01
3
exit_store
2022-03-05 11:22:51
我希望能够根据在每个会话中观察到的模式计算这些事件的计数统计信息。例如,根据上面的table,观察到的每个模式的计数如下:
{
'enter_store -> pay_at_cashier -> exit_store': 2,
'enter_store -> exit_store': 1
}
我正尝试在 PySpark 中执行此操作,但我在找出最有效的方法来执行这种缺少某些步骤的模式匹配时遇到了一些问题。真正的问题涉及一个更大的数据集,其中包含约 1500 万个这样的事件。
我尝试过这样的逻辑:过滤整个 DF 以获取观察到 'enter_store' 的唯一会话,然后过滤该 DF 以获取观察到 'pay_at_cashier' 的唯一会话。这很好用,唯一的问题是我无法想到如何计算像 3 这样的会话,其中只有开始步骤和最后一步,但没有中间步骤。
显然,执行此暴力操作的一种方法是遍历每个会话并为其分配一个模式并递增一个计数器,但我正在寻找更有效和可扩展的方法来执行此操作。
如有任何建议或见解,我们将不胜感激。
对于 Spark 2.4+,你可以这样做
df = (df
.withColumn("flow", F.expr("sort_array(collect_list(struct(timestamp, activity)) over (partition by session))"))
.withColumn("flow", F.expr("concat_ws(' -> ', transform(flow, v -> v.activity))"))
.groupBy("flow").agg(F.countDistinct("session").alias("total_session"))
)
df.show(truncate=False)
# +-------------------------------------------+-------------+
# |flow |total_session|
# +-------------------------------------------+-------------+
# |enter_store -> pay_at_cashier -> exit_store|2 |
# |enter_store -> exit_store |1 |
# +-------------------------------------------+-------------+
第一个块是为每个 session[=] 收集 timestamp 及其 activity 的列表24=] 在有序数组中(确保 timestamp 是时间戳格式)基于其 timestamp 值。之后,使用 transform
函数仅使用数组中的 activity 值(如果需要,使用 concat_ws
将它们组合起来创建一个字符串)和按 activity 顺序对它们进行分组以获得不同的 sessions.
我正在解决一个问题,我有一个以下格式的数据集(出于示例目的替换了真实数据):
session | activity | timestamp |
---|---|---|
1 | enter_store | 2022-03-01 23:25:11 |
1 | pay_at_cashier | 2022-03-01 23:31:10 |
1 | exit_store | 2022-03-01 23:55:01 |
2 | enter_store | 2022-03-02 07:15:00 |
2 | pay_at_cashier | 2022-03-02 07:24:00 |
2 | exit_store | 2022-03-02 07:35:55 |
3 | enter_store | 2022-03-05 11:07:01 |
3 | exit_store | 2022-03-05 11:22:51 |
我希望能够根据在每个会话中观察到的模式计算这些事件的计数统计信息。例如,根据上面的table,观察到的每个模式的计数如下:
{
'enter_store -> pay_at_cashier -> exit_store': 2,
'enter_store -> exit_store': 1
}
我正尝试在 PySpark 中执行此操作,但我在找出最有效的方法来执行这种缺少某些步骤的模式匹配时遇到了一些问题。真正的问题涉及一个更大的数据集,其中包含约 1500 万个这样的事件。
我尝试过这样的逻辑:过滤整个 DF 以获取观察到 'enter_store' 的唯一会话,然后过滤该 DF 以获取观察到 'pay_at_cashier' 的唯一会话。这很好用,唯一的问题是我无法想到如何计算像 3 这样的会话,其中只有开始步骤和最后一步,但没有中间步骤。
显然,执行此暴力操作的一种方法是遍历每个会话并为其分配一个模式并递增一个计数器,但我正在寻找更有效和可扩展的方法来执行此操作。
如有任何建议或见解,我们将不胜感激。
对于 Spark 2.4+,你可以这样做
df = (df
.withColumn("flow", F.expr("sort_array(collect_list(struct(timestamp, activity)) over (partition by session))"))
.withColumn("flow", F.expr("concat_ws(' -> ', transform(flow, v -> v.activity))"))
.groupBy("flow").agg(F.countDistinct("session").alias("total_session"))
)
df.show(truncate=False)
# +-------------------------------------------+-------------+
# |flow |total_session|
# +-------------------------------------------+-------------+
# |enter_store -> pay_at_cashier -> exit_store|2 |
# |enter_store -> exit_store |1 |
# +-------------------------------------------+-------------+
第一个块是为每个 session[=] 收集 timestamp 及其 activity 的列表24=] 在有序数组中(确保 timestamp 是时间戳格式)基于其 timestamp 值。之后,使用 transform
函数仅使用数组中的 activity 值(如果需要,使用 concat_ws
将它们组合起来创建一个字符串)和按 activity 顺序对它们进行分组以获得不同的 sessions.