Kafka 在 MeteredKeyValueStore 中传输 NPE
Kafka Streams NPE in MeteredKeyValueStore
我正在尝试使用 Scala 中的 ProcessorAPI 运行 一个非常基本的流。
class KafkaProcessor extends Processor[String, GenericRecord] {
private var kvStore: KeyValueStore[String, GenericRecord] = _
override def init(processorContext: ProcessorContext): Unit = {
this.kvStore = Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("random-mame"),
Serdes.String,
new GenericAvroSerde
)
}
override def process(
key: String,
value: GenericRecord
): Unit = {
val currentState = Option(kvStore.get(key)) // NPE
...
}
}
错误日志中似乎抛出了一些内部 NPE:
Exception in thread "test-4294024b-1390-4c2f-ba8e-e520cca728ff-StreamThread-1" java.lang.NullPointerException
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:134)
at writeside.kafka.AggregateKafkaProcessor.process(KafkaProcessor.scala:64)
at writeside.kafka.AggregateKafkaProcessor.process(KafkaProcessor.scala:35)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
与MeteredKeyValueStore
里面的getTime
有关。我不确定这是怎么发生的以及我该如何预防。
如果要使用store,需要在processor外面声明store(即把store加到StreamBuilder
),连接存储(通过 StreamsBuilder
)到处理器。
在处理程序中,您使用 ProcessorContext
获取商店的句柄。
有关详细信息,请参阅文档:https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html
我正在尝试使用 Scala 中的 ProcessorAPI 运行 一个非常基本的流。
class KafkaProcessor extends Processor[String, GenericRecord] {
private var kvStore: KeyValueStore[String, GenericRecord] = _
override def init(processorContext: ProcessorContext): Unit = {
this.kvStore = Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("random-mame"),
Serdes.String,
new GenericAvroSerde
)
}
override def process(
key: String,
value: GenericRecord
): Unit = {
val currentState = Option(kvStore.get(key)) // NPE
...
}
}
错误日志中似乎抛出了一些内部 NPE:
Exception in thread "test-4294024b-1390-4c2f-ba8e-e520cca728ff-StreamThread-1" java.lang.NullPointerException
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:134)
at writeside.kafka.AggregateKafkaProcessor.process(KafkaProcessor.scala:64)
at writeside.kafka.AggregateKafkaProcessor.process(KafkaProcessor.scala:35)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
与MeteredKeyValueStore
里面的getTime
有关。我不确定这是怎么发生的以及我该如何预防。
如果要使用store,需要在processor外面声明store(即把store加到StreamBuilder
),连接存储(通过 StreamsBuilder
)到处理器。
在处理程序中,您使用 ProcessorContext
获取商店的句柄。
有关详细信息,请参阅文档:https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html