升级到 spring-kafka 2.8 后,在 RecordInterceptor 中读取 kafka headers 时出现问题

Problem reading kafka headers in a RecordInterceptor after upgrading to spring-kafka 2.8

我有一个使用 spring boot & spring kafka 的应用程序,它在记录拦截器中获取传送尝试 header,以便我可以将其包含在日志消息中。它一直运行良好,直到我升级到 spring boot 2.6.3 和 spring kafka 2.8.2(从 2.5.5/2.7.7)

现在当我尝试读取投递尝试时 header 它不可用。如果我尝试在消息侦听器中做完全相同的事情,那么它工作正常,所以 header 显然在那里。

这是简化的记录拦截器和侦听器容器工厂的样子:

        @Bean
        public RecordInterceptor<Object, Object> recordInterceptor() {
            return record -> {
                int delivery = ByteBuffer.wrap(record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt();
                log.info("delivery " + delivery);
                return record;
            };
        }


        @Bean
        public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory) {

            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            configurer.configure(factory, kafkaConsumerFactory);
            factory.getContainerProperties().setDeliveryAttemptHeader(true);
            factory.setRecordInterceptor(recordInterceptor());

            return factory;
        }

我在 spring 文档中看不到任何暗示行为应该改变的内容。有什么想法吗?

这是一个错误;我开了一个问题。 https://github.com/spring-projects/spring-kafka/issues/2082