Kafka 流 GlobalKTable 在 Tombstone 上抛出反序列化异常 - 空值 - 记录
Kafka streams GlobalKTable throws Deserialization exception on Tombstone - null value- records
我有一个 Spring 基于云流的 Kafka Streams 应用程序,我在其中将一个 Global KTable 绑定到一个 Compact 主题。当我将墓碑记录推送到主题(具有空值的非空键)时 - 我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。
根据文档,我认为 GlobalKTable 甚至不会“看到”空值记录。不是这样吗?我需要在反序列化器中处理空记录吗?
org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument "src" is null
at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)
是;您必须检查 null
和 return null
。请参阅任何标准反序列化器。
与 KafkaConsumer
的 Fetcher
(在调用前检查 null
)不同,kafka-streams
无条件地调用它。参见
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
我有一个 Spring 基于云流的 Kafka Streams 应用程序,我在其中将一个 Global KTable 绑定到一个 Compact 主题。当我将墓碑记录推送到主题(具有空值的非空键)时 - 我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。
根据文档,我认为 GlobalKTable 甚至不会“看到”空值记录。不是这样吗?我需要在反序列化器中处理空记录吗?
org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument "src" is null
at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)
是;您必须检查 null
和 return null
。请参阅任何标准反序列化器。
与 KafkaConsumer
的 Fetcher
(在调用前检查 null
)不同,kafka-streams
无条件地调用它。参见
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)