一分钟聚合 window 在 Kafka 2.4.0 中给出意外结果

One Minute aggregate window giving unexpected result in Kafka 2.4.0

我使用 https://github.com/gwenshap/kafka-streams-stockstats 中的代码 最新的 windowedBy()

final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1));
......
 .windowedBy(window)

然后打印流

 stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + 
                " Value => "+value.countTrades));

对于相同的键和 1 分钟窗口,我得到以下输出。我期待每 1 分钟窗口一次使用此密钥的单个记录。我在这里缺少什么?

> Line 817: [2021-03-29 15:40:21,444] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 19
> Line 823: [2021-03-29 15:40:52,111] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 43
> Line 837: [2021-03-29 15:41:24,076] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 55

注:我也试过抑制()。

Suppress() 是前进的方向。 文章 (https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/) 解释了如何使用。 使用抑制的唯一警告是,在接下来的 window 中,必须使用您在 groupBy() 中使用的相同密钥接收新事件。

另请查看来自 Matthias J.Sax 的其他 Whosebug 的评论:

下面是上述 git 存储库中的特定代码更改,以获取时间窗口上特定键的单个值的结果。

   KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);
    final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(5));
    KStream<String, TradeStats> stats = 
            source
            .groupByKey()
            .windowedBy(window)
            .<TradeStats>aggregate(() -> new TradeStats(),(k, v, tradestats) -> tradestats.add(v),
                    Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
                            .withValueSerde(new TradeStatsSerde()).withKeySerde(Serdes.String()))
            .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("trade-suppress"))
            .toStream()            
            .map((final Windowed<String> k, final TradeStats v) -> new KeyValue<>(k.key(), v), Named.as("trade-map"));;
    
   stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + " Value => "+value.countTrades));