如何使结构化流中的 dropDuplicates 状态过期以避免 OOM?
How to expire state of dropDuplicates in structured streaming to avoid OOM?
我想使用spark structured streaming来统计每天的唯一访问量,所以我使用了下面的代码
.dropDuplicates("uuid")
并且在第二天应该删除为今天维护的状态,以便我可以获得第二天的唯一访问的正确计数并避免 OOM。 spark文档说明使用带水印的dropDuplicates,例如:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
但水印列必须在dropDuplicates中指定。在这种情况下,uuid 和时间戳将用作组合键,以对具有相同 uuid 和时间戳的元素进行重复数据删除,这不是我所期望的。
那么有完美的解决方案吗?
经过几天的努力终于自己摸索出来了
在研究watermark和dropDuplicates的源码时,发现watermark除了一个eventTime列外,还支持window 列,因此我们可以使用以下代码:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
由于同一天的所有事件都具有相同的 window,这将产生与仅使用 uuid 去重相同的结果。希望能对某人有所帮助。
下面是对Spark文档中提出的程序的修改。技巧是操纵事件时间,即将事件时间放入
水桶。假设事件时间以毫秒为单位提供。
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df) {
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
}
我发现 window 功能不起作用,所以我选择使用 window.start 或 window.end。
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
我想使用spark structured streaming来统计每天的唯一访问量,所以我使用了下面的代码
.dropDuplicates("uuid")
并且在第二天应该删除为今天维护的状态,以便我可以获得第二天的唯一访问的正确计数并避免 OOM。 spark文档说明使用带水印的dropDuplicates,例如:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
但水印列必须在dropDuplicates中指定。在这种情况下,uuid 和时间戳将用作组合键,以对具有相同 uuid 和时间戳的元素进行重复数据删除,这不是我所期望的。
那么有完美的解决方案吗?
经过几天的努力终于自己摸索出来了
在研究watermark和dropDuplicates的源码时,发现watermark除了一个eventTime列外,还支持window 列,因此我们可以使用以下代码:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
由于同一天的所有事件都具有相同的 window,这将产生与仅使用 uuid 去重相同的结果。希望能对某人有所帮助。
下面是对Spark文档中提出的程序的修改。技巧是操纵事件时间,即将事件时间放入 水桶。假设事件时间以毫秒为单位提供。
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df) {
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
}
我发现 window 功能不起作用,所以我选择使用 window.start 或 window.end。
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")