使用ErrorHandlingDeserializer2处理反序列化异常时如何捕获异常和message key
How to capture the exception and message key when using ErrorHandlingDeserializer2 to handle exceptions during deserialization
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE.And 我正在使用 @KafkaListener 注释创建一个消费者,我我正在使用 consumer.And 的所有默认设置,我正在使用 Spring-Kafka 文档中指定的以下配置。
// other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
现在,我的要求是出现反序列化异常时。请建议我该怎么做。
- 捕获异常并记录密钥以记录详细信息
- 将此记录发送到 DLT
添加 SeekToCurrentErrorHandler
,配置 DeadLetterPublishingRecoverer
到容器。
见the documentation and here。
反序列化异常被视为致命异常,不会重试。
如果您出于某种原因想要编写自己的错误处理程序,DeadLetterPublishingRecoverer
中的代码显示了如何提取异常信息。
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE.And 我正在使用 @KafkaListener 注释创建一个消费者,我我正在使用 consumer.And 的所有默认设置,我正在使用 Spring-Kafka 文档中指定的以下配置。
// other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
现在,我的要求是出现反序列化异常时。请建议我该怎么做。
- 捕获异常并记录密钥以记录详细信息
- 将此记录发送到 DLT
添加 SeekToCurrentErrorHandler
,配置 DeadLetterPublishingRecoverer
到容器。
见the documentation and here。
反序列化异常被视为致命异常,不会重试。
如果您出于某种原因想要编写自己的错误处理程序,DeadLetterPublishingRecoverer
中的代码显示了如何提取异常信息。