会话 Windows 与 Kafka Stream 的行为不符合预期
Session Windows behave with Kafka Stream is not as expected
我是使用 kafka 流的新手,但我注意到的是我意想不到的行为。我开发了一个应用程序,它消耗了 6 个主题。我的目标是通过内部字段对每个主题的事件进行分组(或加入)。那工作正常。但我的问题是 window 时间,看起来每个周期的结束时间都会影响到所有聚合的时间。是否只有一个计时器用于所有聚合同时进行?我期待的是,当流获得配置的 30 秒时,就会退出聚合过程。我认为这是可能的,因为我已经看到有关 Windowed windowedRegion 变量的数据,并且每个流的 windowedRegion.window().start() 和 windowedRegion.window().end() 值都不同.
这是我的代码:
streamsBuilder
.stream(topicList, Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(new MyGroupByKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(windowInactivity).until(windowDuration))
.aggregate(
new MyInitializer(),
new MyAggregator(),
new MyMerger(),
Materialized.with(new Serdes.StringSerde(), new PaymentListSerde())
)
.mapValues(
new MyMapper()
)
.toStream(new MyKeyValueMapper())
.to(consolidationTopic,Produced.with(Serdes.String(), Serdes.String()));
我不确定这是否是您要问的,但每个聚合(每个 per-key 会话 window)可能确实会更新多次。您通常不会在每个 window 只收到一条关于您的 "consolidation" 主题的会话 window 的最终结果的消息。这在此处有更详细的解释:
我是使用 kafka 流的新手,但我注意到的是我意想不到的行为。我开发了一个应用程序,它消耗了 6 个主题。我的目标是通过内部字段对每个主题的事件进行分组(或加入)。那工作正常。但我的问题是 window 时间,看起来每个周期的结束时间都会影响到所有聚合的时间。是否只有一个计时器用于所有聚合同时进行?我期待的是,当流获得配置的 30 秒时,就会退出聚合过程。我认为这是可能的,因为我已经看到有关 Windowed windowedRegion 变量的数据,并且每个流的 windowedRegion.window().start() 和 windowedRegion.window().end() 值都不同. 这是我的代码:
streamsBuilder
.stream(topicList, Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(new MyGroupByKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(windowInactivity).until(windowDuration))
.aggregate(
new MyInitializer(),
new MyAggregator(),
new MyMerger(),
Materialized.with(new Serdes.StringSerde(), new PaymentListSerde())
)
.mapValues(
new MyMapper()
)
.toStream(new MyKeyValueMapper())
.to(consolidationTopic,Produced.with(Serdes.String(), Serdes.String()));
我不确定这是否是您要问的,但每个聚合(每个 per-key 会话 window)可能确实会更新多次。您通常不会在每个 window 只收到一条关于您的 "consolidation" 主题的会话 window 的最终结果的消息。这在此处有更详细的解释: