KafkaStreams:处理 KStream-KTable Join 中的反序列化异常

KafkaStreams: Handling Deserialize exception in KStream-KTable Join

假设我们正在 KStream 和 KTable 之间进行内部连接,如下所示:

        StreamsBuilder sb = new StreamsBuilder();
        JsonSerde<SensorMetaData> sensorMetaDataJsonSerde = new JsonSerde<>(SensorMetaData.class);
        KTable<String, String> kTable = sb.stream("sensorMetadata",
                Consumed.with(Serdes.String(), sensorMetaDataJsonSerde)).toTable();

        KStream<String, String> kStream = sb.stream("sensorValues",
                Consumed.with(Serdes.String(), Serdes.String()));

        KStream<String, String> joined = kStream.join(kTable, (left, right)->{return getJoinedOutput(left, right);});

申请的几个要点:

  1. SensorMetaData 是一个 POJO

        public class SensorMetaData{        
        String sensorId;
        String sensorMetadata;      
        }
    
  2. DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 设置为 org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

  3. JsonSerde class 如果反序列化失败,将抛出 SerializationException。

当我 运行 应用程序并向两个主题发送消息时,加入按预期工作。

现在我如下更改了 SensorMetaData 的架构,并在新节点上重新部署了应用程序

public class SensorMetaData{            
String sensorId;    
MetadataTag[] metadataTags;         
}

应用程序启动后,当我向 sensorValues 主题(流主题)发送消息时,应用程序正在关闭 org.apache.kafka.common.errors.SerializationException。查看堆栈跟踪,我意识到由于 SensorMetaData 中的模式更改,它在执行连接时未能反序列化 SensorMetaData。 Deserialize 方法中的断点显示,它试图反序列化主题 "app-KSTREAM-TOTABLE-STATE-STORE-0000000002-changelog".

中的数据

所以问题是为什么应用程序关闭而不是跳过坏记录(即具有旧模式的记录),即使 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 设置为 org.apache.kafka.streams.errors.LogAndContinueExceptionHandler?

但是,当应用程序在读取主题 "sensorMetadata" 时遇到错误记录(即 sb.stream("sensorMetadata")),它会成功跳过带有警告的记录 "Skipping record due to deserialization error" .

为什么加入这里不跳过坏记录?如何处理这种情况。我希望应用程序跳过记录并继续 运行ning 而不是关闭。这是堆栈跟踪

at kafkastream.JsonSerde.deserialize(JsonSerde.java:51)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:207)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get(MeteredKeyValueStore.java:133)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:821)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
    at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
    at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:77)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
INFO stream-client [app-814c1c5b-a899-4cbf-8d85-2ed6eba81ccb] State transition from ERROR to PENDING_SHUTDOWN 

Kafka 在读取 RocksDB 文件时不使用 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 中的处理程序(请参阅堆栈跟踪提到 class StateSerdes)。这就是为什么它适用于来自源主题的记录,但在反序列化 table.

中的数据时失败的原因

我对 Kafka 不是很有经验,但我一遍又一遍地听到:如果发生变化,将具有新格式的数据复制到另一个主题或删除数据,重置偏移量并重新处理。

在这种情况下,也许最好删除 KTable 文件,用于 ktable 的内部主题,让应用程序重新生成具有新结构的 KTable。

这个几个月前的博客解释了更多的过程或删除数据:https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

分享一点见解:kafka 是一个非常复杂的野兽。要在生产中成功管理它,您需要构建大量工具、代码来维护它,并且(通常)更改您的部署过程以适应 Kafka。