避免写入 Kafka Streams 中的中间聚合存储

Avoid Writing into intemediate Aggregate Store in Kafka Streams

在我的例子中,我正在执行 hopping window 例如(100sec,1sec)

KTable<Windowed<String>, aggrTest> WinMinMax = Records.groupByKey().aggregate(new aggrTestInitilizer(), 
        new minMaxCalculator()
        , TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).advanceBy(TimeUnit.SECONDS.toMillis(1)),aggrMessageSerde);

但是这里 100 秒 window 的消息数量非常大,这导致 window 执行需要很长时间。因此,要缩短此 window 执行时间,

  1. 我想避免将数据写入中间聚合状态存储(kafka 流默认写入)。
  2. 此外,如果 (1) 不可能,那么我们可以将 window 生成的中间聚合状态存储在 RAM 而不是磁盘中吗?(相同的设置是什么?)
  3. 关于改进 window 执行时间的任何进一步建议?

不确定您是如何实现 MinMaxCalculator() 的,但我认为它只是将当前 min/max 与新值进行比较。因此,商店仅包含当前聚合。 -- 因此,window 大小根本无关紧要,因为与 window 大小无关,您只存储键和当前聚合结果。

回答您的问题:

  1. 根据设计,聚合需要一个存储来保存当前的聚合结果——因此,您不能删除一个存储。
  2. 是的,您可以使用内存存储。 aggregate() 方法有一个重载,允许您放置自定义存储,并且有实用程序 类 来创建内存存储。查看文档(顺便说一句:那些 API 在 Kafka 1.0 中得到了很多简化,所以如果你还没有使用 1.0,我建议升级):https://docs.confluent.io/current/streams/developer-guide/processor-api.html#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs
  3. 如上所述,window 的大小不会影响您的计算 "speed"

Side notes:

  • if you go with in-memory state instead of RocksDB, you limit the store size to your RAM size -- this might become a problem if your retention time is large as state can become quite big
  • if you do rolling bounces, this will take more time as the state needs to be recreated by reading the full changelog topic -- RocksDB stores can recover from local disk what is much faster
  • you could try to stay with RocksDB and increase KTable cache size to improve performance: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html