从 ProcessorContext 获取状态存储时是否可能出现 InvalidStateStoreException?
Is InvalidStateStoreException possible when state store is obtained from ProcessorContext?
尝试从 KafkaStreams
获取本地状态存储时,如果本地 KafkaStreams
实例尚未准备好或状态存储刚刚迁移到,则可能会出现 InvalidStateStoreException
异常另一个实例(重新平衡)。
假设我们有 DSL 拓扑,其中包括由 addStateStore
添加的本地状态存储和一些过程或转换(KStream:process
或 KStream::transform
)。
以下是我的问题:
- 当从
Processor::init
或 Transformer::init
内的 ProcessorContext
上下文中获取本地状态存储时,是否会抛出 InvalidStateStoreException
异常,即
KeyValueStore<ByteString, User> userStore =
(KeyValueStore<ByteString, User>) context.getStateStore("store_name");
- 我们存储对
KeyValueStore<ByteString, User> userStore
的引用,稍后用它来修改Punctuator::punctuate
里面的userStore
。我们是否应该担心在对这家商店的任何 put/get/delete 操作中出现 InvalidStateStoreException
异常?
Could InvalidStateStoreException exception be thrown when local state store is obtained from ProcessorContext context inside Processor::init or Transformer::init, i.e.
没有。 init()
在商店准备好之前不会被调用。因此,InvalidStateStoreException
永远不会发生。
We store the reference to KeyValueStore userStore and use it later to modify the userStore inside Punctuator::punctuate. Should we worry about getting InvalidStateStoreException exception on any put/get/delete operations with this store?
没有。您可以安全地 read/write 商店。 InvalidStateStoreException
永远不会发生。 punctuate()
由与 process()
相同的线程执行,并确保在调用 punctuate()
时存储已准备就绪。
尝试从 KafkaStreams
获取本地状态存储时,如果本地 KafkaStreams
实例尚未准备好或状态存储刚刚迁移到,则可能会出现 InvalidStateStoreException
异常另一个实例(重新平衡)。
假设我们有 DSL 拓扑,其中包括由 addStateStore
添加的本地状态存储和一些过程或转换(KStream:process
或 KStream::transform
)。
以下是我的问题:
- 当从
Processor::init
或Transformer::init
内的ProcessorContext
上下文中获取本地状态存储时,是否会抛出InvalidStateStoreException
异常,即
KeyValueStore<ByteString, User> userStore =
(KeyValueStore<ByteString, User>) context.getStateStore("store_name");
- 我们存储对
KeyValueStore<ByteString, User> userStore
的引用,稍后用它来修改Punctuator::punctuate
里面的userStore
。我们是否应该担心在对这家商店的任何 put/get/delete 操作中出现InvalidStateStoreException
异常?
Could InvalidStateStoreException exception be thrown when local state store is obtained from ProcessorContext context inside Processor::init or Transformer::init, i.e.
没有。 init()
在商店准备好之前不会被调用。因此,InvalidStateStoreException
永远不会发生。
We store the reference to KeyValueStore userStore and use it later to modify the userStore inside Punctuator::punctuate. Should we worry about getting InvalidStateStoreException exception on any put/get/delete operations with this store?
没有。您可以安全地 read/write 商店。 InvalidStateStoreException
永远不会发生。 punctuate()
由与 process()
相同的线程执行,并确保在调用 punctuate()
时存储已准备就绪。