Kafka Streams Rocksdb 保留没有使用窗口功能删除旧数据

Kafka Streams Rocksdb retention didn't remove old data with windowed function

我是 运行 具有 windowed 功能的 Kafka 流应用程序。但是24小时后运行,本地磁盘使用量从5G增加到20G,并且还在不断增加。根据我搜索的内容,一旦我引入 windowedBy,它应该会自动删除旧数据。

我的拓扑如下所示:

stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()

我无法理解的一件事是,从这个拓扑结构中,它将创建两个内部重新分区主题,作为两个 groupBy 操作的 repartition-03repartition-14。从磁盘上看,所有执行 repartition-03 任务的机器都具有很高的磁盘使用率,并且似乎从不删除旧数据,而执行 运行 repartition-14 任务的机器总是磁盘使用率较低。

当我登录机器时,我发现这两台机器的路径不同,如下所示:

/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000

为什么他们有不同的路径? 2_40 用于 repartition-14 任务,路径中有 rocksdb,而另一个不包含 rocksdb。同时,taks 1_4 保留几个文件夹,如 KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000 但具有不同的后缀。

虽然我引入了windowedBy函数,rocksdb会在window过期时删除旧数据?为什么以上两个内部重新分区主题具有不同的路径和保留行为?

非常感谢任何帮助!谢谢!

默认保留期为 24 小时。您可以通过

减少它
.reduce(..., Materialized.with(...).withRetention(...));