如何对不相关的内容进行分组
How to group unassociated content
我有一个记录用户行为的配置单元table
像这样
userid
behavior
timestamp
url
1
view
1650022601
url1
1
click
1650022602
url2
1
click
1650022614
url3
1
view
1650022617
url4
1
click
1650022622
url5
1
view
1650022626
url7
2
view
1650022628
url8
2
view
1650022631
url9
每天约有 400GB 添加到 table。
我想按时间戳升序排序,然后一个 'view' 在另一个 'view' 之间
像这样table,前3行属于同一组,然后减去时间戳,
比如 1650022614 - 1650022601 作为观看时间。
如何操作?
我尝试滞后和超前函数,或者像这样的 scala
val pairRDD: RDD[(Int, String)] = record.map(x => {
if (StringUtil.isDateString(x.split("\s+")(0))) {
partition = partition + 1
(partition, x)
} else {
(partition, x)
}
})
或java像这样
LongAccumulator part = spark.sparkContext().longAccumulator("part");
JavaPairRDD<Long, Row> pairRDD = spark.sql(sql).coalesce(1).javaRDD().mapToPair((PairFunction<Row, Long, Row>) row -> {
if (row.getAs("event") == "pageview") {
part.add(1L);
}
return new Tuple2<>(part.value(), row);
});
但是当数据集非常大时,这段代码就很愚蠢了。
救救我
如果您使用数据框,您可以使用 window 构建分区,当您更改分区时对值为 1 的列求和,如果不更改分区则为 0。
您可以使用 sparkSession.createDataframe()
方法将 RDD 转换为数据帧,如
中所述
回到你的问题。在您的情况下,每次列行为等于“视图”时您都会更改分区。所以我们可以从这个条件开始:
import org.apache.spark.sql.functions.col
val df1 = df.withColumn("is_view", (col("behavior") === "view").cast("integer"))
你得到以下数据框:
+------+--------+----------+----+-------+
|userid|behavior|timestamp |url |is_view|
+------+--------+----------+----+-------+
|1 |view |1650022601|url1|1 |
|1 |click |1650022602|url2|0 |
|1 |click |1650022614|url3|0 |
|1 |view |1650022617|url4|1 |
|1 |click |1650022622|url5|0 |
|1 |view |1650022626|url7|1 |
|2 |view |1650022628|url8|1 |
|2 |view |1650022631|url9|1 |
+------+--------+----------+----+-------+
然后使用按时间戳排序的 window 对 is_view
列求和:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum
val df2 = df1.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
这会得到以下数据框:
+------+--------+----------+----+-------+---------+
|userid|behavior|timestamp |url |is_view|partition|
+------+--------+----------+----+-------+---------+
|1 |view |1650022601|url1|1 |1 |
|1 |click |1650022602|url2|0 |1 |
|1 |click |1650022614|url3|0 |1 |
|1 |view |1650022617|url4|1 |2 |
|1 |click |1650022622|url5|0 |2 |
|1 |view |1650022626|url7|1 |3 |
|2 |view |1650022628|url8|1 |1 |
|2 |view |1650022631|url9|1 |2 |
+------+--------+----------+----+-------+---------+
然后,您只需按用户 ID 和分区进行聚合:
import org.apache.spark.sql.functions.{max, min}
val result = df2.groupBy("userid", "partition")
.agg((max("timestamp") - min("timestamp")).as("duration"))
你得到以下结果:
+------+---------+--------+
|userid|partition|duration|
+------+---------+--------+
|1 |1 |13 |
|1 |2 |5 |
|1 |3 |0 |
|2 |1 |0 |
|2 |2 |0 |
+------+---------+--------+
完整的scala代码:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, max, min, sum}
val result = df
.withColumn("is_view", (col("behavior") === "view").cast("integer"))
.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
.groupBy("userid", "partition")
.agg((max("timestamp") - min("timestamp")).as("duration"))
我有一个记录用户行为的配置单元table
像这样
userid | behavior | timestamp | url |
---|---|---|---|
1 | view | 1650022601 | url1 |
1 | click | 1650022602 | url2 |
1 | click | 1650022614 | url3 |
1 | view | 1650022617 | url4 |
1 | click | 1650022622 | url5 |
1 | view | 1650022626 | url7 |
2 | view | 1650022628 | url8 |
2 | view | 1650022631 | url9 |
每天约有 400GB 添加到 table。
我想按时间戳升序排序,然后一个 'view' 在另一个 'view' 之间 像这样table,前3行属于同一组,然后减去时间戳, 比如 1650022614 - 1650022601 作为观看时间。
如何操作?
我尝试滞后和超前函数,或者像这样的 scala
val pairRDD: RDD[(Int, String)] = record.map(x => {
if (StringUtil.isDateString(x.split("\s+")(0))) {
partition = partition + 1
(partition, x)
} else {
(partition, x)
}
})
或java像这样
LongAccumulator part = spark.sparkContext().longAccumulator("part");
JavaPairRDD<Long, Row> pairRDD = spark.sql(sql).coalesce(1).javaRDD().mapToPair((PairFunction<Row, Long, Row>) row -> {
if (row.getAs("event") == "pageview") {
part.add(1L);
}
return new Tuple2<>(part.value(), row);
});
但是当数据集非常大时,这段代码就很愚蠢了。
救救我
如果您使用数据框,您可以使用 window 构建分区,当您更改分区时对值为 1 的列求和,如果不更改分区则为 0。
您可以使用 sparkSession.createDataframe()
方法将 RDD 转换为数据帧,如
回到你的问题。在您的情况下,每次列行为等于“视图”时您都会更改分区。所以我们可以从这个条件开始:
import org.apache.spark.sql.functions.col
val df1 = df.withColumn("is_view", (col("behavior") === "view").cast("integer"))
你得到以下数据框:
+------+--------+----------+----+-------+
|userid|behavior|timestamp |url |is_view|
+------+--------+----------+----+-------+
|1 |view |1650022601|url1|1 |
|1 |click |1650022602|url2|0 |
|1 |click |1650022614|url3|0 |
|1 |view |1650022617|url4|1 |
|1 |click |1650022622|url5|0 |
|1 |view |1650022626|url7|1 |
|2 |view |1650022628|url8|1 |
|2 |view |1650022631|url9|1 |
+------+--------+----------+----+-------+
然后使用按时间戳排序的 window 对 is_view
列求和:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum
val df2 = df1.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
这会得到以下数据框:
+------+--------+----------+----+-------+---------+
|userid|behavior|timestamp |url |is_view|partition|
+------+--------+----------+----+-------+---------+
|1 |view |1650022601|url1|1 |1 |
|1 |click |1650022602|url2|0 |1 |
|1 |click |1650022614|url3|0 |1 |
|1 |view |1650022617|url4|1 |2 |
|1 |click |1650022622|url5|0 |2 |
|1 |view |1650022626|url7|1 |3 |
|2 |view |1650022628|url8|1 |1 |
|2 |view |1650022631|url9|1 |2 |
+------+--------+----------+----+-------+---------+
然后,您只需按用户 ID 和分区进行聚合:
import org.apache.spark.sql.functions.{max, min}
val result = df2.groupBy("userid", "partition")
.agg((max("timestamp") - min("timestamp")).as("duration"))
你得到以下结果:
+------+---------+--------+
|userid|partition|duration|
+------+---------+--------+
|1 |1 |13 |
|1 |2 |5 |
|1 |3 |0 |
|2 |1 |0 |
|2 |2 |0 |
+------+---------+--------+
完整的scala代码:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, max, min, sum}
val result = df
.withColumn("is_view", (col("behavior") === "view").cast("integer"))
.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
.groupBy("userid", "partition")
.agg((max("timestamp") - min("timestamp")).as("duration"))