反序列化异常时如何捕获ConsumerRecord的值?
How can I capture the value of ConsumerRecord when there is a deserialization exception?
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE.And 我正在使用 @KafkaListener 注释创建一个消费者,我我正在使用消费者的所有默认设置。
我扩展了 SeekToCurrentErrorHandler 并重写了 handle 方法,如下所示。现在我的问题是,我得到的值是 null 而不是实际值,因为它无法反序列化该值。在这种情况下,如果我想用键和值对 ConsumerRecord 进行 DLT,是否可能?
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data,
Consumer<?, ?> consumer, MessageListenerContainer container) {
if(thrownException instanceof DeserializationException){
final String messageKey = (String) data.get(0).key();
logger.logException(thrownException, "DeserializerException occurred for message key"+messageKey, getClass());
System.out.println("messageKey is "+messageKey);
System.out.println("Value is "+data.get(0).value());
List<Object> list = Arrays.stream(data.get(0).headers().toArray()).filter( header -> {
return header.key().contains("springDeserializerExceptionValue");
}).collect(Collectors.toList());
//TODO: Identify how do we DLT the message if we don't get the value in the consumer record
} else
//Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
super.handle(thrownException, data, consumer, container);
}
失败的值在异常中 - getData()。
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE.And 我正在使用 @KafkaListener 注释创建一个消费者,我我正在使用消费者的所有默认设置。
我扩展了 SeekToCurrentErrorHandler 并重写了 handle 方法,如下所示。现在我的问题是,我得到的值是 null 而不是实际值,因为它无法反序列化该值。在这种情况下,如果我想用键和值对 ConsumerRecord 进行 DLT,是否可能?
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data,
Consumer<?, ?> consumer, MessageListenerContainer container) {
if(thrownException instanceof DeserializationException){
final String messageKey = (String) data.get(0).key();
logger.logException(thrownException, "DeserializerException occurred for message key"+messageKey, getClass());
System.out.println("messageKey is "+messageKey);
System.out.println("Value is "+data.get(0).value());
List<Object> list = Arrays.stream(data.get(0).headers().toArray()).filter( header -> {
return header.key().contains("springDeserializerExceptionValue");
}).collect(Collectors.toList());
//TODO: Identify how do we DLT the message if we don't get the value in the consumer record
} else
//Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
super.handle(thrownException, data, consumer, container);
}
失败的值在异常中 - getData()。