反序列化错误并记录分区、主题和偏移量
Deserialisation error and logging the partition, topic and offset
我正在使用我的 DefaultKafkaConsumerFactory 上发送的 ErrorHandlingDeserialiser 处理反序列化错误。
我有自定义代码
try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
}
我的自定义函数进行一些处理并发布到毒丸主题和 returns null.
当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止 return 函数中的 null 和 return MyEvent 的新子类型。然后我的 KafkaListener 可以询问新的子类型。
我有一个 @KafkaListener 组件,它监听 ConsumerRecord 如下:
@KafkaListner(....)
public void onMessage(ConsumerRecord<String, MyEvent> record) {
...
...
// if record.value instance of MyNewSubType
// I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.
}
是这样吗?我知道 null 有特殊含义 Kafka.
这种子类型方法的缺点是,我必须使用 ErrorHandlingDeserialiser 创建每个类型的子类型。
不使用函数;相反,抛出的 DeserializationException
直接传递给容器的 ErrorHandler
.
SeekToCurrentErrorHandler
认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。
提供了一个发送记录的DeadLetterPublishingRecoverer
。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
和
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
我正在使用我的 DefaultKafkaConsumerFactory 上发送的 ErrorHandlingDeserialiser 处理反序列化错误。
我有自定义代码
try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
}
我的自定义函数进行一些处理并发布到毒丸主题和 returns null.
当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止 return 函数中的 null 和 return MyEvent 的新子类型。然后我的 KafkaListener 可以询问新的子类型。
我有一个 @KafkaListener 组件,它监听 ConsumerRecord 如下:
@KafkaListner(....)
public void onMessage(ConsumerRecord<String, MyEvent> record) {
...
...
// if record.value instance of MyNewSubType
// I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.
}
是这样吗?我知道 null 有特殊含义 Kafka.
这种子类型方法的缺点是,我必须使用 ErrorHandlingDeserialiser 创建每个类型的子类型。
不使用函数;相反,抛出的 DeserializationException
直接传递给容器的 ErrorHandler
.
SeekToCurrentErrorHandler
认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。
提供了一个发送记录的DeadLetterPublishingRecoverer
。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
和
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters