Kafka Streams - 过滤一段时间内频繁出现的消息 window

Kafka Streams - Filter messages that appear frequently in a time window

我正在尝试过滤在长度为 T.[=31= 的给定(跳跃)时间 window 内密钥出现频率超过阈值 N 的任何消息]

例如,在以下流中:

#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D

N=2T=3,结果应该是

0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D

或者,如果以上不可能,则简化为仅在达到阈值后过滤消息:

#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D

Kafka Streams 可以吗?

到目前为止,我已尝试创建流的 windowed countKTable 的实例)并将其连接回原始流。我使用 KTable#toStream((k,v) -> k.key())windowed count 的密钥更改回原始密钥并执行 dummy aggregation 返回到 KTable 的实例。这似乎引入了延迟,导致 leftJoin 错过超过阈值后非常接近的消息。

    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();

    KStream<String, Long> wcount = source.groupByKey()
            .count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
            .toStream((k,v) -> k.key());

    // perform dummy aggregation to get KTable
    KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
                .reduce((aggValue, newValue) -> newValue, 
                 "dummy-aggregation-store");

    // left join and filter with threshold N=1
    source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde )
            .filter((k,v) -> v!=null)
            .filter((k,v) -> v>1)
            .print("output");

我也尝试过执行 KStream-KStream 加入适当的 window(忽略虚拟聚合):

    source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
            .filter((k,v) -> v!=null)
            .filter((k,v) -> v>1)
            .print("output");

这会导致重复输出,因为每个 UPSERTwcount 都会触发一个事件。

据我所知,这非常适合 Count-Min-Sketch 算法。参见例如 stream-lib 实现:

https://github.com/addthis/stream-lib

这当然是可能的。您可以应用 windowed 聚合来收集列表中的所有原始数据(即,您手动实现 window)。之后,您应用一个计算 window 的 flatMap。如果尚未达到阈值,则不会发出任何信号。如果第一次达到阈值,您将发出 all 缓冲数据。对于计数大于阈值的 flatMap 的所有进一步调用,您只发出列表中的最新一个(您知道您确实发出了所有其他调用之前对 flatMap 的调用,即仅发出新添加的一个)。

Note: you need to disable KTable cache, ie, set config parameter "cache.max.bytes.buffering" = 0. Otherwise, the algorithms won't work correctly.

像这样:

KStream<Windowed<K>, List<V>> windows = stream.groupByKey()
                                              .aggregate(
                                                /*init with empty list*/,
                                                /*add value to list in agg*/,
                                                TimeWindows.of()...),
                                                ...)
                                                .toStream();
KStream<K,V> thresholdMetStream = windows.flatMap(
                                            /* if List#size < threshold
                                               then return empty-list, ie, nothing
                                               elseif List#size == threshold
                                               then return whole list
                                               else [List#size > threshold]
                                               then return last element from list
                                            */);