如何在 Kafka Streams 中 remove/clear 状态存储?
How to remove/clear state stores in Kafka Streams?
我在我的 kafka-streams DSL 末尾有一个自定义 Transformer
实现,并绑定了一个持久的变更日志 KeyValueStore
。
几周以来,我一直在商店中放置太多数据。现在,每当我加载应用程序时,它都会占用太多内存。
但是,应用程序本身只是一个原型,所以我不介意完全清理商店。
我可以重命名 kafka.application.id
和 state-store-name
但这是一个临时解决方法(相应的 data/topics 不会被删除)。
如何彻底清除它?
Confluent 的 documentation recommends 要么使用 KafkaStreams.cleanUp(),要么手动删除位于 /var/lib/kafka-streams/<application.id>
的目录(配置参数 state.dir
)。
您还需要使用 special reset tool - bin/kafka-streams-application-reset
:
重置应用程序使用的所有主题
bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
这个关于重置状态的post很有意思
我在我的 kafka-streams DSL 末尾有一个自定义 Transformer
实现,并绑定了一个持久的变更日志 KeyValueStore
。
几周以来,我一直在商店中放置太多数据。现在,每当我加载应用程序时,它都会占用太多内存。
但是,应用程序本身只是一个原型,所以我不介意完全清理商店。
我可以重命名 kafka.application.id
和 state-store-name
但这是一个临时解决方法(相应的 data/topics 不会被删除)。
如何彻底清除它?
Confluent 的 documentation recommends 要么使用 KafkaStreams.cleanUp(),要么手动删除位于 /var/lib/kafka-streams/<application.id>
的目录(配置参数 state.dir
)。
您还需要使用 special reset tool - bin/kafka-streams-application-reset
:
bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
这个关于重置状态的post很有意思