Kafka 流状态存储 rocksdb 文件大小不会因手动删除消息而减少

Kafka stream state store rocksdb file size not decreasing on manual deletion of messages

我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过使用 kafka 密钥调用状态存储的交互式查询来确认,但它不会减少目录 tmp/kafka-streams.

下本地磁盘上的 kafka 流文件大小
@Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
            @Override
            public void punctuate(long l) {
                processorContext.commit();
            }
        }); //invoke punctuate every 12 seconds
        this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
        log.info("Processor initialized");
    }

    @Override
    public void process(String key, GenericRecord value) {
        statestore.all().forEachRemaining(keyValue -> {
            statestore.delete(keyValue.key);
        });
    }

kafka 流目录大小

2.3M    /private/tmp/kafka-streams
3.3M    /private/tmp/kafka-streams

我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更新日志主题中删除记录。

RocksDB 在后台进行文件压缩。因此,如果您需要更积极的压缩,您应该通过 Streams 配置参数 rocksdb.config.setter 传入自定义 RocksDBConfigSetter。有关 RockDB 的更多详细信息,请查看 RocksDB 文档。

https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter

但是,只要没有真正的问题,我就不建议更改 RocksDB 配置——这样做弊大于利。看来你们的店铺规模很小,因此,我看不出 atm 有真正的问题。

顺便说一句: 如果你进入生产环境,你应该将 state.dir 配置更改为适当的目录,即使在重新启动机器后状态也不会丢失.如果将状态放入默认 /tmp 位置,状态很可能会在机器重启后消失,并且会触发从更改日志主题进行的昂贵恢复。