从 ProcessorContext 获取状态存储时是否可能出现 InvalidStateStoreException?

Is InvalidStateStoreException possible when state store is obtained from ProcessorContext?

尝试从 KafkaStreams 获取本地状态存储时,如果本地 KafkaStreams 实例尚未准备好或状态存储刚刚迁移到,则可能会出现 InvalidStateStoreException 异常另一个实例(重新平衡)。

假设我们有 DSL 拓扑,其中包括由 addStateStore 添加的本地状态存储和一些过程或转换(KStream:processKStream::transform)。

以下是我的问题:

  1. 当从 Processor::initTransformer::init 内的 ProcessorContext 上下文中获取本地状态存储时,是否会抛出 InvalidStateStoreException 异常,即
KeyValueStore<ByteString, User> userStore =  
    (KeyValueStore<ByteString, User>) context.getStateStore("store_name"); 
  1. 我们存储对KeyValueStore<ByteString, User> userStore的引用,稍后用它来修改Punctuator::punctuate里面的userStore。我们是否应该担心在对这家商店的任何 put/get/delete 操作中出现 InvalidStateStoreException 异常?

Could InvalidStateStoreException exception be thrown when local state store is obtained from ProcessorContext context inside Processor::init or Transformer::init, i.e.

没有。 init() 在商店准备好之前不会被调用。因此,InvalidStateStoreException 永远不会发生。

We store the reference to KeyValueStore userStore and use it later to modify the userStore inside Punctuator::punctuate. Should we worry about getting InvalidStateStoreException exception on any put/get/delete operations with this store?

没有。您可以安全地 read/write 商店。 InvalidStateStoreException 永远不会发生。 punctuate() 由与 process() 相同的线程执行,并确保在调用 punctuate() 时存储已准备就绪。