避免写入 Kafka Streams 中的中间聚合存储
Avoid Writing into intemediate Aggregate Store in Kafka Streams
在我的例子中,我正在执行 hopping window
例如(100sec,1sec)
。
KTable<Windowed<String>, aggrTest> WinMinMax = Records.groupByKey().aggregate(new aggrTestInitilizer(),
new minMaxCalculator()
, TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).advanceBy(TimeUnit.SECONDS.toMillis(1)),aggrMessageSerde);
但是这里 100 秒 window 的消息数量非常大,这导致 window 执行需要很长时间。因此,要缩短此 window 执行时间,
- 我想避免将数据写入中间聚合状态存储(kafka 流默认写入)。
- 此外,如果 (1) 不可能,那么我们可以将 window 生成的中间聚合状态存储在 RAM 而不是磁盘中吗?(相同的设置是什么?)
- 关于改进 window 执行时间的任何进一步建议?
不确定您是如何实现 MinMaxCalculator()
的,但我认为它只是将当前 min/max 与新值进行比较。因此,商店仅包含当前聚合。 -- 因此,window 大小根本无关紧要,因为与 window 大小无关,您只存储键和当前聚合结果。
回答您的问题:
- 根据设计,聚合需要一个存储来保存当前的聚合结果——因此,您不能删除一个存储。
- 是的,您可以使用内存存储。
aggregate()
方法有一个重载,允许您放置自定义存储,并且有实用程序 类 来创建内存存储。查看文档(顺便说一句:那些 API 在 Kafka 1.0 中得到了很多简化,所以如果你还没有使用 1.0,我建议升级):https://docs.confluent.io/current/streams/developer-guide/processor-api.html#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs
- 如上所述,window 的大小不会影响您的计算 "speed"
Side notes:
- if you go with in-memory state instead of RocksDB, you limit the store size to your RAM size -- this might become a problem if your retention time is large as state can become quite big
- if you do rolling bounces, this will take more time as the state needs to be recreated by reading the full changelog topic -- RocksDB stores can recover from local disk what is much faster
- you could try to stay with RocksDB and increase KTable cache size to improve performance: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
在我的例子中,我正在执行 hopping window
例如(100sec,1sec)
。
KTable<Windowed<String>, aggrTest> WinMinMax = Records.groupByKey().aggregate(new aggrTestInitilizer(),
new minMaxCalculator()
, TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).advanceBy(TimeUnit.SECONDS.toMillis(1)),aggrMessageSerde);
但是这里 100 秒 window 的消息数量非常大,这导致 window 执行需要很长时间。因此,要缩短此 window 执行时间,
- 我想避免将数据写入中间聚合状态存储(kafka 流默认写入)。
- 此外,如果 (1) 不可能,那么我们可以将 window 生成的中间聚合状态存储在 RAM 而不是磁盘中吗?(相同的设置是什么?)
- 关于改进 window 执行时间的任何进一步建议?
不确定您是如何实现 MinMaxCalculator()
的,但我认为它只是将当前 min/max 与新值进行比较。因此,商店仅包含当前聚合。 -- 因此,window 大小根本无关紧要,因为与 window 大小无关,您只存储键和当前聚合结果。
回答您的问题:
- 根据设计,聚合需要一个存储来保存当前的聚合结果——因此,您不能删除一个存储。
- 是的,您可以使用内存存储。
aggregate()
方法有一个重载,允许您放置自定义存储,并且有实用程序 类 来创建内存存储。查看文档(顺便说一句:那些 API 在 Kafka 1.0 中得到了很多简化,所以如果你还没有使用 1.0,我建议升级):https://docs.confluent.io/current/streams/developer-guide/processor-api.html#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs - 如上所述,window 的大小不会影响您的计算 "speed"
Side notes:
- if you go with in-memory state instead of RocksDB, you limit the store size to your RAM size -- this might become a problem if your retention time is large as state can become quite big
- if you do rolling bounces, this will take more time as the state needs to be recreated by reading the full changelog topic -- RocksDB stores can recover from local disk what is much faster
- you could try to stay with RocksDB and increase KTable cache size to improve performance: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html