Kafka Streams - 过滤一段时间内频繁出现的消息 window
Kafka Streams - Filter messages that appear frequently in a time window
我正在尝试过滤在长度为 T
.[=31= 的给定(跳跃)时间 window 内密钥出现频率超过阈值 N
的任何消息]
例如,在以下流中:
#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D
和N=2
和T=3
,结果应该是
0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D
或者,如果以上不可能,则简化为仅在达到阈值后过滤消息:
#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D
Kafka Streams 可以吗?
到目前为止,我已尝试创建流的 windowed count
(KTable
的实例)并将其连接回原始流。我使用 KTable#toStream((k,v) -> k.key())
将 windowed count
的密钥更改回原始密钥并执行 dummy aggregation 返回到 KTable
的实例。这似乎引入了延迟,导致 leftJoin
错过超过阈值后非常接近的消息。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> wcount = source.groupByKey()
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
.toStream((k,v) -> k.key());
// perform dummy aggregation to get KTable
KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
.reduce((aggValue, newValue) -> newValue,
"dummy-aggregation-store");
// left join and filter with threshold N=1
source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde )
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
我也尝试过执行 KStream
-KStream
加入适当的 window(忽略虚拟聚合):
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
这会导致重复输出,因为每个 UPSERT
到 wcount
都会触发一个事件。
据我所知,这非常适合 Count-Min-Sketch 算法。参见例如 stream-lib 实现:
这当然是可能的。您可以应用 windowed 聚合来收集列表中的所有原始数据(即,您手动实现 window)。之后,您应用一个计算 window 的 flatMap。如果尚未达到阈值,则不会发出任何信号。如果第一次达到阈值,您将发出 all 缓冲数据。对于计数大于阈值的 flatMap 的所有进一步调用,您只发出列表中的最新一个(您知道您确实发出了所有其他调用之前对 flatMap 的调用,即仅发出新添加的一个)。
Note: you need to disable KTable cache, ie, set config parameter "cache.max.bytes.buffering" = 0. Otherwise, the algorithms won't work correctly.
像这样:
KStream<Windowed<K>, List<V>> windows = stream.groupByKey()
.aggregate(
/*init with empty list*/,
/*add value to list in agg*/,
TimeWindows.of()...),
...)
.toStream();
KStream<K,V> thresholdMetStream = windows.flatMap(
/* if List#size < threshold
then return empty-list, ie, nothing
elseif List#size == threshold
then return whole list
else [List#size > threshold]
then return last element from list
*/);
我正在尝试过滤在长度为 T
.[=31= 的给定(跳跃)时间 window 内密钥出现频率超过阈值 N
的任何消息]
例如,在以下流中:
#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D
和N=2
和T=3
,结果应该是
0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D
或者,如果以上不可能,则简化为仅在达到阈值后过滤消息:
#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D
Kafka Streams 可以吗?
到目前为止,我已尝试创建流的 windowed count
(KTable
的实例)并将其连接回原始流。我使用 KTable#toStream((k,v) -> k.key())
将 windowed count
的密钥更改回原始密钥并执行 dummy aggregation 返回到 KTable
的实例。这似乎引入了延迟,导致 leftJoin
错过超过阈值后非常接近的消息。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> wcount = source.groupByKey()
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
.toStream((k,v) -> k.key());
// perform dummy aggregation to get KTable
KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
.reduce((aggValue, newValue) -> newValue,
"dummy-aggregation-store");
// left join and filter with threshold N=1
source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde )
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
我也尝试过执行 KStream
-KStream
加入适当的 window(忽略虚拟聚合):
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
这会导致重复输出,因为每个 UPSERT
到 wcount
都会触发一个事件。
据我所知,这非常适合 Count-Min-Sketch 算法。参见例如 stream-lib 实现:
这当然是可能的。您可以应用 windowed 聚合来收集列表中的所有原始数据(即,您手动实现 window)。之后,您应用一个计算 window 的 flatMap。如果尚未达到阈值,则不会发出任何信号。如果第一次达到阈值,您将发出 all 缓冲数据。对于计数大于阈值的 flatMap 的所有进一步调用,您只发出列表中的最新一个(您知道您确实发出了所有其他调用之前对 flatMap 的调用,即仅发出新添加的一个)。
Note: you need to disable KTable cache, ie, set config parameter "cache.max.bytes.buffering" = 0. Otherwise, the algorithms won't work correctly.
像这样:
KStream<Windowed<K>, List<V>> windows = stream.groupByKey()
.aggregate(
/*init with empty list*/,
/*add value to list in agg*/,
TimeWindows.of()...),
...)
.toStream();
KStream<K,V> thresholdMetStream = windows.flatMap(
/* if List#size < threshold
then return empty-list, ie, nothing
elseif List#size == threshold
then return whole list
else [List#size > threshold]
then return last element from list
*/);