Kafka Stream Aggregator - 如何设置在发送消息之前等待聚合的时间?
Kafka Stream Aggregator - how to set time to wait in aggregations before spitting messages?
我的 Kafka Streams 聚合读取一个紧凑的主题并执行此操作:
(0_10, ..)
, (0_11, ..)
--->
(0, [10])
(0, [10, 11])
我想知道如何控制聚合时间-window,所以它不会为每个传入的消息吐出一条消息,而是等待并聚合其中的一些。想象一下 Stream App 使用这些消息:
(0_10, ..)
(1_11, ..)
(0_13, ..)
如果前 3 条消息在短时间内到达 window,我希望看到这个:
(0,[10])
(0, [10, 13])
(1, [11])
我不知道如何告诉我的 Kafka Stream 应用程序等待更多聚合的时间,然后再吐出一个新值。
我的代码很简单
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
目前,它有时会批量聚合,但不确定如何调整它:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them.
Kafka Streams 的 windowing 无法做到这一点。一般来说,Kafka Streams windows 不会 "close" 或 "end" 因为你不能告诉它一旦 window [=27= 就产生最终结果] (没有这样的概念)。这是为了适应迟到的结果。当消息到达聚合 window 时,您将看到更新。 Kafka Streams 吐出更新的频率取决于缓存(见下文)。有关更多信息,请参阅:
Currently, it sometime aggregates in batches, but not sure how to tweak it:
您在那里看到的很可能是支持 KTables
的存储中缓存的结果。 KTables
仅在刷新更新日志和提交偏移量时转发下游消息。这是为了在需要恢复它们的状态时保持一致性。如果您更改 Kafka Streams 应用程序的提交间隔,您的缓存刷新频率将会降低,因此您会看到从 KTable
s 转发的更新(变更日志、聚合等)更少。但这与 windowing.
无关
综上所述,如果您想对变更日志流进行 windowed 聚合,您可以使用 KTable#toStream()
将其从 KTable
转换为 KStream
.然后您可以在聚合步骤中指定 windows。
我的 Kafka Streams 聚合读取一个紧凑的主题并执行此操作:
(0_10, ..)
, (0_11, ..)
--->
(0, [10])
(0, [10, 11])
我想知道如何控制聚合时间-window,所以它不会为每个传入的消息吐出一条消息,而是等待并聚合其中的一些。想象一下 Stream App 使用这些消息:
(0_10, ..)
(1_11, ..)
(0_13, ..)
如果前 3 条消息在短时间内到达 window,我希望看到这个:
(0,[10])
(0, [10, 13])
(1, [11])
我不知道如何告诉我的 Kafka Stream 应用程序等待更多聚合的时间,然后再吐出一个新值。
我的代码很简单
builder
.table(keySerde, valueSerde, sourceTopic)
.groupBy(StreamBuilder::groupByMapper)
.aggregate(
StreamBuilder::aggregateInitializer,
StreamBuilder::aggregateAdder,
StreamBuilder::aggregateSubtractor)
.to(...);
目前,它有时会批量聚合,但不确定如何调整它:
{"Aggregate":[100]}
{"Aggregate":[100,300,301,302]}
{"Aggregate":[100,300,301,302,404]}
I would like to know how to control aggregation time-window, so it doesn't spit a message for each incoming message, but waits and aggregates some of them.
Kafka Streams 的 windowing 无法做到这一点。一般来说,Kafka Streams windows 不会 "close" 或 "end" 因为你不能告诉它一旦 window [=27= 就产生最终结果] (没有这样的概念)。这是为了适应迟到的结果。当消息到达聚合 window 时,您将看到更新。 Kafka Streams 吐出更新的频率取决于缓存(见下文)。有关更多信息,请参阅:
Currently, it sometime aggregates in batches, but not sure how to tweak it:
您在那里看到的很可能是支持 KTables
的存储中缓存的结果。 KTables
仅在刷新更新日志和提交偏移量时转发下游消息。这是为了在需要恢复它们的状态时保持一致性。如果您更改 Kafka Streams 应用程序的提交间隔,您的缓存刷新频率将会降低,因此您会看到从 KTable
s 转发的更新(变更日志、聚合等)更少。但这与 windowing.
综上所述,如果您想对变更日志流进行 windowed 聚合,您可以使用 KTable#toStream()
将其从 KTable
转换为 KStream
.然后您可以在聚合步骤中指定 windows。