Pyspark 使用 Pivot 的用户保留的不同计数

Pyspark Distinct count of User Retention using Pivot

我正在尝试使用 Pyspark 创建用户保留 table,我可以将其传输到 AWS Glue 以创建一个 ETL 作业,我可以在 QuickSight 中使用 Athena 进行查询。

基本上,我有两个 table,一个是用户注册日期,一个是用户 activity 日期。然后将此注册日期与 activity 日期进行比较,以计算用户在注册后多长时间处于活动状态。此后,我想跟踪在某个月注册的用户中有多少在 0、1、2 周等之后处于活动状态。因此,我想计算第 0 周、第 1 周等之后的不同用户数,即不是正常群组 table 按月分组然后进行跟踪,这可能会导致用户 activity 在注册后 3 个月比注册后 2 个月更大。

table 的片段和期望的结果如下所示:

然而,

为了获得每月的注册数量,我只做了一个简单的 groupBy:

df_reg = df\
.sort(col('user_id').asc(), col('created_at').asc())\
.groupBy('registered_at_month')\
.agg(countDistinct('user_id').alias('reg'))

为了在每周之后获得不同的用户数,我对数据框应用了一个过滤器并循环了几周,然后应用了一个数据透视函数来获得 table:

retention = []

for week in weeks:
    print(week)
    df_out = df\
        .filter((col('diff_week') >= week))\
        .sort(col('user_id').asc(), col('created_at').asc())\
        .groupBy('registered_at_month')\
        .agg(countDistinct('user_id').alias('countDistinct'))\
        .withColumn('week', lit(week))

retention.append(df_out)

df_retention = functools.reduce(DataFrame.union, retention)
df_retention_2 = df_retention\
    .groupBy('registered_at_month')\
    .pivot('week')\
    .agg(first('countDistinct'))\
    .orderBy('registered_at_month')

有更简洁的方法吗? 最好不要使用 for 循环。此外,当输入数据变大并且每月有数千名用户注册和数百周 activity 时,数据透视功能将永远存在?最后,是否可以使用一些计算字段直接在 QuickSight 中完成?

非常感谢您的帮助!谢谢!

是的,有一种更高效的方法可以做到这一点。在 Spark 中,按聚合分组是昂贵的,因为它意味着一个洗牌阶段,当 Spark 在其执行者之间重组数据时。在您当前的代码中,您每周都在聚合,这意味着您正在执行 n+2 聚合,其中 n 是周数:注册用户数为一个,n 为每周一次,每周一次用于数据透视聚合。

您可以将其减少为两个聚合,方法是在同一聚合中对每周求和,而不是每周求和然后进行透视。这是代码:

from pyspark.sql import functions as F

result = df.groupby(
        F.date_format('registered_at', 'MMM').alias('Month'),
        F.col('user_id')
    ) \
    .agg(F.max('diff_week').alias('max_diff')) \
    .groupBy('Month') \
    .agg(
        F.countDistinct('user_id').alias('Registered'),
        *[F.sum((F.col('max_diff') >= week).cast('integer')).alias(str(week)) for week in weeks]
    ) \
    .orderBy('Month')

weeks 数组包含从 0 到 10 的整数,以及以下 df 数据帧:

+-------------+----------+---------+-------+
|registered_at|created_at|diff_week|user_id|
+-------------+----------+---------+-------+
|2021-08-01   |2021-08-01|0        |1      |
|2021-08-01   |2021-08-05|0        |1      |
|2021-08-01   |2021-08-18|2        |1      |
|2021-08-01   |2021-08-21|2        |1      |
|2021-08-01   |2021-09-15|6        |1      |
|2021-08-01   |2021-08-01|0        |2      |
|2021-08-01   |2021-08-09|1        |2      |
|2021-08-01   |2021-08-10|1        |2      |
|2021-08-01   |2021-08-19|2        |2      |
|2021-08-01   |2021-08-22|3        |2      |
|2021-08-02   |2021-08-02|0        |3      |
|2021-08-02   |2021-08-09|1        |3      |
|2021-08-02   |2021-08-30|4        |3      |
+-------------+----------+---------+-------+

您得到以下 result 输出:

+-----+----------+---+---+---+---+---+---+---+---+---+---+
|Month|Registered|0  |1  |2  |3  |4  |5  |6  |7  |8  |9  |
+-----+----------+---+---+---+---+---+---+---+---+---+---+
|Aug  |3         |3  |3  |3  |3  |2  |1  |1  |0  |0  |0  |
+-----+----------+---+---+---+---+---+---+---+---+---+---+

而且它会比您的解决方案更高效

注意:在聚合之前对数据帧进行排序是没有用的,因为聚合会重新排序数据。然而,这里没有坏处,因为 Spark Catalyst 在聚合之前忽略了那种排序。