反序列化错误并记录分区、主题和偏移量

Deserialisation error and logging the partition, topic and offset

我正在使用我的 DefaultKafkaConsumerFactory 上发送的 ErrorHandlingDeserialiser 处理反序列化错误。

我有自定义代码

try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
            errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
            return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
        }

我的自定义函数进行一些处理并发布到毒丸主题和 returns null.

当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止 return 函数中的 null 和 return MyEvent 的新子类型。然后我的 KafkaListener 可以询问新的子类型。

我有一个 @KafkaListener 组件,它监听 Co​​nsumerRecord 如下:

 @KafkaListner(....)
 public void onMessage(ConsumerRecord<String, MyEvent> record) {
   ...
   ...
   
// if record.value instance of MyNewSubType
//   I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.

 }

是这样吗?我知道 null 有特殊含义 Kafka.

这种子类型方法的缺点是,我必须使用 ErrorHandlingDeserialiser 创建每个类型的子类型。

不使用函数;相反,抛出的 DeserializationException 直接传递给容器的 ErrorHandler.

SeekToCurrentErrorHandler 认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。

提供了一个发送记录的DeadLetterPublishingRecoverer

https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters