在使用 seekToErrorHandler 消费来自 kafka 主题的消息时,如何将导致 DeserializationException 的记录发送到 DLT?
how can i send the records causing DeserializationException to a DLT while consuming a message from kafka topic using seekToErrorHandler?
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE。我们正在升级 spring 引导版本,但目前,我们正在使用此 spring-kafka 版本。
我正在使用@KafkaListener 注释来创建一个消费者,并且我正在使用 consumer.And 的所有默认设置,我正在使用 Spring-Kafka 文档中指定的以下配置.
// other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
现在,我通过扩展 SeekToCurrentErrorHandler 来捕获发送导致反序列化异常的记录并将它们发送到 DLT,从而实现了我的自定义 SeekToCurrentErrorHandler。
现在的问题是,当我尝试使用 30 条消息和具有反序列化异常的备用消息测试此逻辑时,处理方法列表获取所有 30 条消息,而不是仅获取 15 条消息,这导致例外。话虽如此,我怎样才能得到例外的记录?请提出建议。
这是我自定义的 SeekToCurrentErrorHandler 代码
@Component
public class MySeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final MyDeadLetterRecoverer deadLetterRecoverer;
@Autowired
public MySeekToCurrentErrorHandler(MyDeadLetterRecoverer deadLetterRecoverer) {
super(-1);
this.deadLetterRecoverer = deadLetterRecoverer;
}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (thrownException instanceof DeserializationException) {
//Improve to support multiple records
DeserializationException deserializationException = (DeserializationException) thrownException;
deadLetterRecoverer.accept(data.get(0), deserializationException);
ConsumerRecord<?, ?>. consumerRecord = data.get(0);
sout(consumerRecord.key());
sout(consumerRecord.value());
} else {
//Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
super.handle(thrownException, data, consumer, container);
}
}
}
我们必须传递所有剩余的记录,以便 STCEH 可以为未处理的记录重新寻找所有分区。
恢复失败记录后,使用SeekUtils查找剩余记录(从列表中删除已恢复的记录)。
将 recoverable
设置为 false
,这样 doSeeks()
就不会尝试恢复新的第一条记录。
/**
* Seek records to earliest position, optionally skipping the first.
* @param records the records.
* @param consumer the consumer.
* @param exception the exception
* @param recoverable true if skipping the first record is allowed.
* @param skipper function to determine whether or not to skip seeking the first.
* @param logger a {@link Log} for seek errors.
* @return true if the failed record was skipped.
*/
public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, Log logger) {
当您移动到更新的版本时,您将不需要所有这些代码(不再支持 Apache Kafka 2.2 的 Boot 2.1 和 Spring)。
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE。我们正在升级 spring 引导版本,但目前,我们正在使用此 spring-kafka 版本。
我正在使用@KafkaListener 注释来创建一个消费者,并且我正在使用 consumer.And 的所有默认设置,我正在使用 Spring-Kafka 文档中指定的以下配置.
// other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
现在,我通过扩展 SeekToCurrentErrorHandler 来捕获发送导致反序列化异常的记录并将它们发送到 DLT,从而实现了我的自定义 SeekToCurrentErrorHandler。
现在的问题是,当我尝试使用 30 条消息和具有反序列化异常的备用消息测试此逻辑时,处理方法列表获取所有 30 条消息,而不是仅获取 15 条消息,这导致例外。话虽如此,我怎样才能得到例外的记录?请提出建议。
这是我自定义的 SeekToCurrentErrorHandler 代码
@Component
public class MySeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final MyDeadLetterRecoverer deadLetterRecoverer;
@Autowired
public MySeekToCurrentErrorHandler(MyDeadLetterRecoverer deadLetterRecoverer) {
super(-1);
this.deadLetterRecoverer = deadLetterRecoverer;
}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (thrownException instanceof DeserializationException) {
//Improve to support multiple records
DeserializationException deserializationException = (DeserializationException) thrownException;
deadLetterRecoverer.accept(data.get(0), deserializationException);
ConsumerRecord<?, ?>. consumerRecord = data.get(0);
sout(consumerRecord.key());
sout(consumerRecord.value());
} else {
//Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
super.handle(thrownException, data, consumer, container);
}
}
}
我们必须传递所有剩余的记录,以便 STCEH 可以为未处理的记录重新寻找所有分区。
恢复失败记录后,使用SeekUtils查找剩余记录(从列表中删除已恢复的记录)。
将 recoverable
设置为 false
,这样 doSeeks()
就不会尝试恢复新的第一条记录。
/**
* Seek records to earliest position, optionally skipping the first.
* @param records the records.
* @param consumer the consumer.
* @param exception the exception
* @param recoverable true if skipping the first record is allowed.
* @param skipper function to determine whether or not to skip seeking the first.
* @param logger a {@link Log} for seek errors.
* @return true if the failed record was skipped.
*/
public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, Log logger) {
当您移动到更新的版本时,您将不需要所有这些代码(不再支持 Apache Kafka 2.2 的 Boot 2.1 和 Spring)。