Kafka 流状态存储 rocksdb 文件大小不会因手动删除消息而减少
Kafka stream state store rocksdb file size not decreasing on manual deletion of messages
我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过使用 kafka 密钥调用状态存储的交互式查询来确认,但它不会减少目录 tmp/kafka-streams.
下本地磁盘上的 kafka 流文件大小
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka 流目录大小
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更新日志主题中删除记录。
RocksDB 在后台进行文件压缩。因此,如果您需要更积极的压缩,您应该通过 Streams 配置参数 rocksdb.config.setter
传入自定义 RocksDBConfigSetter
。有关 RockDB 的更多详细信息,请查看 RocksDB 文档。
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter
但是,只要没有真正的问题,我就不建议更改 RocksDB 配置——这样做弊大于利。看来你们的店铺规模很小,因此,我看不出 atm 有真正的问题。
顺便说一句: 如果你进入生产环境,你应该将 state.dir
配置更改为适当的目录,即使在重新启动机器后状态也不会丢失.如果将状态放入默认 /tmp
位置,状态很可能会在机器重启后消失,并且会触发从更改日志主题进行的昂贵恢复。
我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过使用 kafka 密钥调用状态存储的交互式查询来确认,但它不会减少目录 tmp/kafka-streams.
下本地磁盘上的 kafka 流文件大小@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka 流目录大小
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
我是否需要任何特定配置才能控制文件大小?如果这样不行,删除kafka-streams目录可以吗?我认为它应该是安全的,因为这样的删除将从状态存储和更新日志主题中删除记录。
RocksDB 在后台进行文件压缩。因此,如果您需要更积极的压缩,您应该通过 Streams 配置参数 rocksdb.config.setter
传入自定义 RocksDBConfigSetter
。有关 RockDB 的更多详细信息,请查看 RocksDB 文档。
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter
但是,只要没有真正的问题,我就不建议更改 RocksDB 配置——这样做弊大于利。看来你们的店铺规模很小,因此,我看不出 atm 有真正的问题。
顺便说一句: 如果你进入生产环境,你应该将 state.dir
配置更改为适当的目录,即使在重新启动机器后状态也不会丢失.如果将状态放入默认 /tmp
位置,状态很可能会在机器重启后消失,并且会触发从更改日志主题进行的昂贵恢复。