升级到 spring-kafka 2.8 后,在 RecordInterceptor 中读取 kafka headers 时出现问题
Problem reading kafka headers in a RecordInterceptor after upgrading to spring-kafka 2.8
我有一个使用 spring boot & spring kafka 的应用程序,它在记录拦截器中获取传送尝试 header,以便我可以将其包含在日志消息中。它一直运行良好,直到我升级到 spring boot 2.6.3 和 spring kafka 2.8.2(从 2.5.5/2.7.7)
现在当我尝试读取投递尝试时 header 它不可用。如果我尝试在消息侦听器中做完全相同的事情,那么它工作正常,所以 header 显然在那里。
这是简化的记录拦截器和侦听器容器工厂的样子:
@Bean
public RecordInterceptor<Object, Object> recordInterceptor() {
return record -> {
int delivery = ByteBuffer.wrap(record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt();
log.info("delivery " + delivery);
return record;
};
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setRecordInterceptor(recordInterceptor());
return factory;
}
我在 spring 文档中看不到任何暗示行为应该改变的内容。有什么想法吗?
这是一个错误;我开了一个问题。 https://github.com/spring-projects/spring-kafka/issues/2082
我有一个使用 spring boot & spring kafka 的应用程序,它在记录拦截器中获取传送尝试 header,以便我可以将其包含在日志消息中。它一直运行良好,直到我升级到 spring boot 2.6.3 和 spring kafka 2.8.2(从 2.5.5/2.7.7)
现在当我尝试读取投递尝试时 header 它不可用。如果我尝试在消息侦听器中做完全相同的事情,那么它工作正常,所以 header 显然在那里。
这是简化的记录拦截器和侦听器容器工厂的样子:
@Bean
public RecordInterceptor<Object, Object> recordInterceptor() {
return record -> {
int delivery = ByteBuffer.wrap(record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt();
log.info("delivery " + delivery);
return record;
};
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setRecordInterceptor(recordInterceptor());
return factory;
}
我在 spring 文档中看不到任何暗示行为应该改变的内容。有什么想法吗?
这是一个错误;我开了一个问题。 https://github.com/spring-projects/spring-kafka/issues/2082