一分钟聚合 window 在 Kafka 2.4.0 中给出意外结果
One Minute aggregate window giving unexpected result in Kafka 2.4.0
我使用 https://github.com/gwenshap/kafka-streams-stockstats 中的代码
最新的 windowedBy()
final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1));
......
.windowedBy(window)
然后打印流
stats.foreach((key, value) -> logger.info("Key >>>>> "+ key +
" Value => "+value.countTrades));
对于相同的键和 1 分钟窗口,我得到以下输出。我期待每 1 分钟窗口一次使用此密钥的单个记录。我在这里缺少什么?
> Line 817: [2021-03-29 15:40:21,444] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 19
> Line 823: [2021-03-29 15:40:52,111] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 43
> Line 837: [2021-03-29 15:41:24,076] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 55
注:我也试过抑制()。
Suppress() 是前进的方向。
文章 (https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/) 解释了如何使用。
使用抑制的唯一警告是,在接下来的 window 中,必须使用您在 groupBy() 中使用的相同密钥接收新事件。
另请查看来自 Matthias J.Sax 的其他 Whosebug 的评论:
下面是上述 git 存储库中的特定代码更改,以获取时间窗口上特定键的单个值的结果。
KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);
final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(5));
KStream<String, TradeStats> stats =
source
.groupByKey()
.windowedBy(window)
.<TradeStats>aggregate(() -> new TradeStats(),(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
.withValueSerde(new TradeStatsSerde()).withKeySerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("trade-suppress"))
.toStream()
.map((final Windowed<String> k, final TradeStats v) -> new KeyValue<>(k.key(), v), Named.as("trade-map"));;
stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + " Value => "+value.countTrades));
我使用 https://github.com/gwenshap/kafka-streams-stockstats 中的代码 最新的 windowedBy()
final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1));
......
.windowedBy(window)
然后打印流
stats.foreach((key, value) -> logger.info("Key >>>>> "+ key +
" Value => "+value.countTrades));
对于相同的键和 1 分钟窗口,我得到以下输出。我期待每 1 分钟窗口一次使用此密钥的单个记录。我在这里缺少什么?
> Line 817: [2021-03-29 15:40:21,444] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 19 > Line 823: [2021-03-29 15:40:52,111] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 43 > Line 837: [2021-03-29 15:41:24,076] INFO Key >>>>>> [AES@1617012600000/1617012660000] Value => 55
注:我也试过抑制()。
Suppress() 是前进的方向。 文章 (https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/) 解释了如何使用。 使用抑制的唯一警告是,在接下来的 window 中,必须使用您在 groupBy() 中使用的相同密钥接收新事件。
另请查看来自 Matthias J.Sax 的其他 Whosebug 的评论:
下面是上述 git 存储库中的特定代码更改,以获取时间窗口上特定键的单个值的结果。
KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);
final TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(5));
KStream<String, TradeStats> stats =
source
.groupByKey()
.windowedBy(window)
.<TradeStats>aggregate(() -> new TradeStats(),(k, v, tradestats) -> tradestats.add(v),
Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
.withValueSerde(new TradeStatsSerde()).withKeySerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("trade-suppress"))
.toStream()
.map((final Windowed<String> k, final TradeStats v) -> new KeyValue<>(k.key(), v), Named.as("trade-map"));;
stats.foreach((key, value) -> logger.info("Key >>>>> "+ key + " Value => "+value.countTrades));