在 Kafka Streams 中查询 Global State Store 抛出 Null 异常
Querying a Global State Store in Kafka Streams throws a Null exception
当使用处理器 API 和 addGlobalStore
函数创建全局 KTable 时,生成的存储填充 OK。但是随后尝试迭代商店的内容会导致以下异常:
Exception in thread "main" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:63) at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:26) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:208) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:189) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:155) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:113) at org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator.hasNext(DelegatingPeekingKeyValueIterator.java:63)
这是来自遍历商店的简单代码:
ReadOnlyKeyValueStore<String, StreamConfig> store = this.globalStreams.store("config-table",QueryableStoreTypes.<String, Config>keyValueStore());
final KeyValueIterator<String, Config> iteratble = store.all();
HashMap<String, Config> dynamicStreams = new HashMap<String,Config>();
while (iteratble.hasNext()) {
final KeyValue<String, Config> next = iteratble.next();
dynamicStreams.put(next.key, next.value);
}
iteratble.close();
对该存储中条目的任何访问都会导致该异常。如果我使用 globalTable
函数创建全局状态存储,则同一主题工作正常(因此这不是反序列化问题)。状态存储还 returns 行数成功(因此它 是 完全填充)。
全局状态存储创建如下,这是使用 Kafka Streams 0.10.2.1.
KStreamBuilder globalBuilder = new KStreamBuilder();
StateStoreSupplier<KeyValueStore<String, Config>> storeSupplier = Stores
.create("config-table")
.withKeys(Serdes.String())
.withValues(configSerdes)
.persistent()
.build();
// a Processor that updates the store
ProcessorSupplier<String, Config> procSupplier = () -> new ConfigWorker();
globalBuilder.addGlobalStore(
storeSupplier.get(),
"config-table-source",
new StringDeserializer(),
configDeserializer,
"config",
"config-worker",
procSupplier)
.buildGlobalStateTopology();
this.globalStreams = new KafkaStreams(
globalBuilder,
this.getProperties());
globalStreams.start();
编辑:
一个问题是默认情况下会记录状态存储,并且全局 KTables 没有更改日志支持。所以在状态存储创建中添加 .disableLogging
修复了这个问题。
另一个问题似乎出在处理器的 init
函数中对 context.schedule
的调用中。这引发了无效操作异常。删除这个修复了代码,但我猜 punctuate
现在从未被调用过。它现在似乎可以工作了——但不清楚为什么我不能打电话给 schedule
.
根据这个 code,似乎不允许在全局上下文中转发。所以我的全局处理器只需要处理入站更新。
当使用处理器 API 和 addGlobalStore
函数创建全局 KTable 时,生成的存储填充 OK。但是随后尝试迭代商店的内容会导致以下异常:
Exception in thread "main" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:63) at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:26) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:208) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:189) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:155) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:113) at org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator.hasNext(DelegatingPeekingKeyValueIterator.java:63)
这是来自遍历商店的简单代码:
ReadOnlyKeyValueStore<String, StreamConfig> store = this.globalStreams.store("config-table",QueryableStoreTypes.<String, Config>keyValueStore());
final KeyValueIterator<String, Config> iteratble = store.all();
HashMap<String, Config> dynamicStreams = new HashMap<String,Config>();
while (iteratble.hasNext()) {
final KeyValue<String, Config> next = iteratble.next();
dynamicStreams.put(next.key, next.value);
}
iteratble.close();
对该存储中条目的任何访问都会导致该异常。如果我使用 globalTable
函数创建全局状态存储,则同一主题工作正常(因此这不是反序列化问题)。状态存储还 returns 行数成功(因此它 是 完全填充)。
全局状态存储创建如下,这是使用 Kafka Streams 0.10.2.1.
KStreamBuilder globalBuilder = new KStreamBuilder();
StateStoreSupplier<KeyValueStore<String, Config>> storeSupplier = Stores
.create("config-table")
.withKeys(Serdes.String())
.withValues(configSerdes)
.persistent()
.build();
// a Processor that updates the store
ProcessorSupplier<String, Config> procSupplier = () -> new ConfigWorker();
globalBuilder.addGlobalStore(
storeSupplier.get(),
"config-table-source",
new StringDeserializer(),
configDeserializer,
"config",
"config-worker",
procSupplier)
.buildGlobalStateTopology();
this.globalStreams = new KafkaStreams(
globalBuilder,
this.getProperties());
globalStreams.start();
编辑:
一个问题是默认情况下会记录状态存储,并且全局 KTables 没有更改日志支持。所以在状态存储创建中添加
.disableLogging
修复了这个问题。另一个问题似乎出在处理器的
init
函数中对context.schedule
的调用中。这引发了无效操作异常。删除这个修复了代码,但我猜punctuate
现在从未被调用过。它现在似乎可以工作了——但不清楚为什么我不能打电话给schedule
.
根据这个 code,似乎不允许在全局上下文中转发。所以我的全局处理器只需要处理入站更新。