Kafka Streams persistent store error: the state store, may have migrated to another instance
Kafka Streams persistent store error: the state store, may have migrated to another instance
我正在使用带有 Spring Boot 的 Kafka Streams。在我的用例中,当我收到来自其他微服务的客户事件时,我需要存储在 customer 物化视图中,当我收到 order 事件时,我需要加入客户和订单,然后存储在 customer-order 实体化视图中。为此,我创建了持久键值存储 customer-store 和 updating this when a new event comes。
StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer"),Serdes.String(), customerSerde).withLoggingEnabled(new HashMap<>());
streamsBuilder.addStateStore(customerStateStore);
KTable<String,Customer> customerKTable=streamsBuilder.table("customer",Consumed.with(Serdes.String(),customerSerde));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: "+value)));
已配置拓扑、流并启动流对象。当我尝试使用 ReadOnlyKeyValueStore 访问存储时,出现以下异常,即使我刚才存储了一些对象
streams.start();
ReadOnlyKeyValueStore<String, Customer> customerStore = streams.store("customer", QueryableStoreTypes.keyValueStore());
System.out.println("customerStore.approximateNumEntries()-> " + customerStore.approximateNumEntries());
代码上传到Github以供参考。感谢您的帮助。
异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, customer, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
at com.kafkastream.service.EventsListener.main(EventsListener.java:94)
状态存储通常需要一些时间来准备。最简单的方法如下所示。 (代码来自官方文档)
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
您可以在文档中找到更多信息。
https://docs.confluent.io/current/streams/faq.html#interactive-queries
我正在使用带有 Spring Boot 的 Kafka Streams。在我的用例中,当我收到来自其他微服务的客户事件时,我需要存储在 customer 物化视图中,当我收到 order 事件时,我需要加入客户和订单,然后存储在 customer-order 实体化视图中。为此,我创建了持久键值存储 customer-store 和 updating this when a new event comes。
StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer"),Serdes.String(), customerSerde).withLoggingEnabled(new HashMap<>());
streamsBuilder.addStateStore(customerStateStore);
KTable<String,Customer> customerKTable=streamsBuilder.table("customer",Consumed.with(Serdes.String(),customerSerde));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: "+value)));
已配置拓扑、流并启动流对象。当我尝试使用 ReadOnlyKeyValueStore 访问存储时,出现以下异常,即使我刚才存储了一些对象
streams.start();
ReadOnlyKeyValueStore<String, Customer> customerStore = streams.store("customer", QueryableStoreTypes.keyValueStore());
System.out.println("customerStore.approximateNumEntries()-> " + customerStore.approximateNumEntries());
代码上传到Github以供参考。感谢您的帮助。
异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, customer, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
at com.kafkastream.service.EventsListener.main(EventsListener.java:94)
状态存储通常需要一些时间来准备。最简单的方法如下所示。 (代码来自官方文档)
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
您可以在文档中找到更多信息。 https://docs.confluent.io/current/streams/faq.html#interactive-queries