在使用 seekToErrorHandler 消费来自 kafka 主题的消息时,如何将导致 DeserializationException 的记录发送到 DLT?

how can i send the records causing DeserializationException to a DLT while consuming a message from kafka topic using seekToErrorHandler?

我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE。我们正在升级 spring 引导版本,但目前,我们正在使用此 spring-kafka 版本。

我正在使用@KafkaListener 注释来创建一个消费者,并且我正在使用 consumer.And 的所有默认设置,我正在使用 Spring-Kafka 文档中指定的以下配置.

    // other props
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory<>(props);
        

现在,我通过扩展 SeekToCurrentErrorHandler 来捕获发送导致反序列化异常的记录并将它们发送到 DLT,从而实现了我的自定义 SeekToCurrentErrorHandler。

现在的问题是,当我尝试使用 30 条消息和具有反序列化异常的备用消息测试此逻辑时,处理方法列表获取所有 30 条消息,而不是仅获取 15 条消息,这导致例外。话虽如此,我怎样才能得到例外的记录?请提出建议。

这是我自定义的 SeekToCurrentErrorHandler 代码

    @Component
    public class MySeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

        private final MyDeadLetterRecoverer deadLetterRecoverer;

        @Autowired
        public MySeekToCurrentErrorHandler(MyDeadLetterRecoverer deadLetterRecoverer) {
            super(-1);
            this.deadLetterRecoverer = deadLetterRecoverer;
        }

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
            if (thrownException instanceof DeserializationException) {
                //Improve to support multiple records
                DeserializationException deserializationException = (DeserializationException) thrownException;
                deadLetterRecoverer.accept(data.get(0), deserializationException);
            
                ConsumerRecord<?, ?>. consumerRecord = data.get(0);
                sout(consumerRecord.key());
                sout(consumerRecord.value());
            } else {
                //Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
                super.handle(thrownException, data, consumer, container);
            }
        }
    }

我们必须传递所有剩余的记录,以便 STCEH 可以为未处理的记录重新寻找所有分区。

恢复失败记录后,使用SeekUtils查找剩余记录(从列表中删除已恢复的记录)。

recoverable 设置为 false,这样 doSeeks() 就不会尝试恢复新的第一条记录。

    /**
     * Seek records to earliest position, optionally skipping the first.
     * @param records the records.
     * @param consumer the consumer.
     * @param exception the exception
     * @param recoverable true if skipping the first record is allowed.
     * @param skipper function to determine whether or not to skip seeking the first.
     * @param logger a {@link Log} for seek errors.
     * @return true if the failed record was skipped.
     */
    public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
            boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, Log logger) {

当您移动到更新的版本时,您将不需要所有这些代码(不再支持 Apache Kafka 2.2 的 Boot 2.1 和 Spring)。