Spring 启动 kafka 在 application.properties 中设置 ackOnError
Spring boot kafka set ackOnError in application.properties
有没有办法像其他侦听器属性一样使用 spring 启动 application.properties 文件来设置 属性 ackOnError=false,例如:
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-count
spring.kafka.listener.ack-time
spring.kafka.listener.poll-timeout
?
如果不可能,我如何组合:来自文件的属性 + java 配置?我不想像这样在 java-config 中设置所有 kafka 属性:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
我只想覆盖 属性 ackOnError。
提前谢谢你。
它不能作为 属性 使用,但您可以按如下方式覆盖 Boot 的容器工厂 @Bean
...
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckOnError(false);
return factory;
}
这也将应用所有其他启动属性。
但是,除非您也停止容器(例如使用 ContainerStoppingErrorHandler
),否则此设置没有多大用处。
这是因为无论如何都会提交下一个成功记录的偏移量,这超出了失败记录的偏移量。
也就是说,在 2.3 中,默认为 false
。
有没有办法像其他侦听器属性一样使用 spring 启动 application.properties 文件来设置 属性 ackOnError=false,例如:
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-count
spring.kafka.listener.ack-time
spring.kafka.listener.poll-timeout
?
如果不可能,我如何组合:来自文件的属性 + java 配置?我不想像这样在 java-config 中设置所有 kafka 属性:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
我只想覆盖 属性 ackOnError。 提前谢谢你。
它不能作为 属性 使用,但您可以按如下方式覆盖 Boot 的容器工厂 @Bean
...
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckOnError(false);
return factory;
}
这也将应用所有其他启动属性。
但是,除非您也停止容器(例如使用 ContainerStoppingErrorHandler
),否则此设置没有多大用处。
这是因为无论如何都会提交下一个成功记录的偏移量,这超出了失败记录的偏移量。
也就是说,在 2.3 中,默认为 false
。