如何对不相关的内容进行分组

How to group unassociated content

我有一个记录用户行为的配置单元table

像这样

userid behavior timestamp url
1 view 1650022601 url1
1 click 1650022602 url2
1 click 1650022614 url3
1 view 1650022617 url4
1 click 1650022622 url5
1 view 1650022626 url7
2 view 1650022628 url8
2 view 1650022631 url9

每天约有 400GB 添加到 table。

我想按时间戳升序排序,然后一个 'view' 在另一个 'view' 之间 像这样table,前3行属于同一组,然后减去时间戳, 比如 1650022614 - 1650022601 作为观看时间。

如何操作?

我尝试滞后和超前函数,或者像这样的 scala

        val pairRDD: RDD[(Int, String)] = record.map(x => {
            if (StringUtil.isDateString(x.split("\s+")(0))) {
                partition = partition + 1
                (partition, x)
            } else {
                (partition, x)
            }
        })

或java像这样

        LongAccumulator part = spark.sparkContext().longAccumulator("part");

        JavaPairRDD<Long, Row> pairRDD = spark.sql(sql).coalesce(1).javaRDD().mapToPair((PairFunction<Row, Long, Row>) row -> {
            if (row.getAs("event") == "pageview") {
                part.add(1L);
            }
        return new Tuple2<>(part.value(), row);
        });

但是当数据集非常大时,这段代码就很愚蠢了。

救救我

如果您使用数据框,您可以使用 window 构建分区,当您更改分区时对值为 1 的列求和,如果不更改分区则为 0。

您可以使用 sparkSession.createDataframe() 方法将 RDD 转换为数据帧,如

中所述

回到你的问题。在您的情况下,每次列行为等于“视图”时您都会更改分区。所以我们可以从这个条件开始:

import org.apache.spark.sql.functions.col

val df1 = df.withColumn("is_view", (col("behavior") === "view").cast("integer"))

你得到以下数据框:

+------+--------+----------+----+-------+
|userid|behavior|timestamp |url |is_view|
+------+--------+----------+----+-------+
|1     |view    |1650022601|url1|1      |
|1     |click   |1650022602|url2|0      |
|1     |click   |1650022614|url3|0      |
|1     |view    |1650022617|url4|1      |
|1     |click   |1650022622|url5|0      |
|1     |view    |1650022626|url7|1      |
|2     |view    |1650022628|url8|1      |
|2     |view    |1650022631|url9|1      |
+------+--------+----------+----+-------+

然后使用按时间戳排序的 window 对 is_view 列求和:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum

val df2 = df1.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))

这会得到以下数据框:

+------+--------+----------+----+-------+---------+
|userid|behavior|timestamp |url |is_view|partition|
+------+--------+----------+----+-------+---------+
|1     |view    |1650022601|url1|1      |1        |
|1     |click   |1650022602|url2|0      |1        |
|1     |click   |1650022614|url3|0      |1        |
|1     |view    |1650022617|url4|1      |2        |
|1     |click   |1650022622|url5|0      |2        |
|1     |view    |1650022626|url7|1      |3        |
|2     |view    |1650022628|url8|1      |1        |
|2     |view    |1650022631|url9|1      |2        |
+------+--------+----------+----+-------+---------+

然后,您只需按用户 ID 和分区进行聚合:

import org.apache.spark.sql.functions.{max, min}

val result = df2.groupBy("userid", "partition")
  .agg((max("timestamp") - min("timestamp")).as("duration"))

你得到以下结果:

+------+---------+--------+
|userid|partition|duration|
+------+---------+--------+
|1     |1        |13      |
|1     |2        |5       |
|1     |3        |0       |
|2     |1        |0       |
|2     |2        |0       |
+------+---------+--------+

完整的scala代码:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, max, min, sum}

val result = df
  .withColumn("is_view", (col("behavior") === "view").cast("integer"))
  .withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
  .groupBy("userid", "partition")
  .agg((max("timestamp") - min("timestamp")).as("duration"))