如何在 Kafka Streams 中 remove/clear 状态存储?

How to remove/clear state stores in Kafka Streams?

我在我的 kafka-streams DSL 末尾有一个自定义 Transformer 实现,并绑定了一个持久的变更日志 KeyValueStore

几周以来,我一直在商店中放置太多数据。现在,每当我加载应用程序时,它都会占用太多内存。

但是,应用程序本身只是一个原型,所以我不介意完全清理商店。

我可以重命名 kafka.application.idstate-store-name 但这是一个临时解决方法(相应的 data/topics 不会被删除)。

如何彻底清除它?

Confluent 的 documentation recommends 要么使用 KafkaStreams.cleanUp(),要么手动删除位于 /var/lib/kafka-streams/<application.id> 的目录(配置参数 state.dir)。

您还需要使用 special reset tool - bin/kafka-streams-application-reset:

重置应用程序使用的所有主题
bin/kafka-streams-application-reset --application-id my-streams-app \
                                  --input-topics my-input-topic \
                                  --intermediate-topics rekeyed-topic

这个关于重置状态的post很有意思