Spring 启动 kafka 在 application.properties 中设置 ackOnError

Spring boot kafka set ackOnError in application.properties

有没有办法像其他侦听器属性一样使用 spring 启动 application.properties 文件来设置 属性 ackOnError=false,例如:

spring.kafka.listener.ack-mode
spring.kafka.listener.ack-count
spring.kafka.listener.ack-time
spring.kafka.listener.poll-timeout

?

如果不可能,我如何组合:来自文件的属性 + java 配置?我不想像这样在 java-config 中设置所有 kafka 属性:

 @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        ......
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

我只想覆盖 属性 ackOnError。 提前谢谢你。

它不能作为 属性 使用,但您可以按如下方式覆盖 Boot 的容器工厂 @Bean...

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setAckOnError(false);
    return factory;
}

这也将应用所有其他启动属性。

但是,除非您也停止容器(例如使用 ContainerStoppingErrorHandler),否则此设置没有多大用处。

这是因为无论如何都会提交下一个成功记录的偏移量,这超出了失败记录的偏移量。

也就是说,在 2.3 中,默认为 false