为什么我在检索商店进行查询时偶尔会收到 InvalidStateStoreException PARTITIONS_REVOKED,而不是 运行?
Why am I occasionally getting a InvalidStateStoreException PARTITIONS_REVOKED, not RUNNING when retrieving a store to query it?
我正在访问状态存储来查询它,并且不得不用 try/catch 块包装 store()
语句以重试它,因为有时我会收到此异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:57)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1053)
at com.codependent.kafkastreams.customer.service.CustomerService.getCustomer(CustomerService.kt:75)
at com.codependent.kafkastreams.customer.service.CustomerServiceKt.main(CustomerService.kt:108)
这是用于检索商店的代码(完整代码在 github repo 上):
fun getCustomer(id: String): Customer? {
var keyValueStore: ReadOnlyKeyValueStore<String, Customer>? = null
while(keyValueStore == null) {
try {
keyValueStore = streams.store(CUSTOMERS_STORE, QueryableStoreTypes.keyValueStore<String, Customer>())
} catch (ex: InvalidStateStoreException) {
ex.printStackTrace()
}
}
val customer = keyValueStore.get(id)
return customer
}
这是主程序:
fun main(args: Array<String>) {
val customerService = CustomerService("main", "localhost:9092")
customerService.initializeStreams()
customerService.createCustomer(Customer("53", "Joey"))
val customer = customerService.getCustomer("53")
println(customer)
customerService.stopStreams()
}
异常随机发生 运行 程序多次,在之前的执行完成后。注意:我没有对正在执行的 Kafka 集群做任何事情并使用它的默认配置。
在您访问存储时,Kafka Streams 应用程序正在重新平衡,此时无法访问状态存储。您要确保仅在应用程序状态为 运行 而不是 REBALANCING 时才查询商店。
您可以做的是在尝试像这样从商店中读取之前检查应用程序的状态:
if(streams.state() == State.RUNNING) {
keyValueStore = streams.store(...);
val customer = keyValueStore.get(id);
return customer;
}
还有一个 KafkaStreams.setStateListener
方法可用于注册 KafkStreams.StateListener
实现。每次应用程序更改其状态时都会调用 StateListener.onChange
方法。
我正在访问状态存储来查询它,并且不得不用 try/catch 块包装 store()
语句以重试它,因为有时我会收到此异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:57)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1053)
at com.codependent.kafkastreams.customer.service.CustomerService.getCustomer(CustomerService.kt:75)
at com.codependent.kafkastreams.customer.service.CustomerServiceKt.main(CustomerService.kt:108)
这是用于检索商店的代码(完整代码在 github repo 上):
fun getCustomer(id: String): Customer? {
var keyValueStore: ReadOnlyKeyValueStore<String, Customer>? = null
while(keyValueStore == null) {
try {
keyValueStore = streams.store(CUSTOMERS_STORE, QueryableStoreTypes.keyValueStore<String, Customer>())
} catch (ex: InvalidStateStoreException) {
ex.printStackTrace()
}
}
val customer = keyValueStore.get(id)
return customer
}
这是主程序:
fun main(args: Array<String>) {
val customerService = CustomerService("main", "localhost:9092")
customerService.initializeStreams()
customerService.createCustomer(Customer("53", "Joey"))
val customer = customerService.getCustomer("53")
println(customer)
customerService.stopStreams()
}
异常随机发生 运行 程序多次,在之前的执行完成后。注意:我没有对正在执行的 Kafka 集群做任何事情并使用它的默认配置。
在您访问存储时,Kafka Streams 应用程序正在重新平衡,此时无法访问状态存储。您要确保仅在应用程序状态为 运行 而不是 REBALANCING 时才查询商店。
您可以做的是在尝试像这样从商店中读取之前检查应用程序的状态:
if(streams.state() == State.RUNNING) {
keyValueStore = streams.store(...);
val customer = keyValueStore.get(id);
return customer;
}
还有一个 KafkaStreams.setStateListener
方法可用于注册 KafkStreams.StateListener
实现。每次应用程序更改其状态时都会调用 StateListener.onChange
方法。