如何以避免重新创建状态存储的方式重新启动 KafkaStreams 拓扑
How to restart a KafkaStreams topology in a way that avoids recreating the state store
我用 KafkaStreams DSL 和处理器 api 编写了一些代码来进行实时计算:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> logs = builder.stream(stringSerde, stringSerde, ADDCASH_TOPIC_NAME);
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
logs.process(() -> new AddCashProcessor(), countStore.name());
我想知道:当我重新启动拓扑时,Stores.create(..)
方法是否会重新创建 StateStore?
它将重用现有状态,并且仅在存在 none 时才创建和 empty/new 存储。
我用 KafkaStreams DSL 和处理器 api 编写了一些代码来进行实时计算:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> logs = builder.stream(stringSerde, stringSerde, ADDCASH_TOPIC_NAME);
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
logs.process(() -> new AddCashProcessor(), countStore.name());
我想知道:当我重新启动拓扑时,Stores.create(..)
方法是否会重新创建 StateStore?
它将重用现有状态,并且仅在存在 none 时才创建和 empty/new 存储。