Spring Kafka 的集成属性
Spring integration properties for Kafka
尝试在 application.yml 中使用侦听器配置属性时,我遇到了一个问题,如果我使用 application.yml 配置(listener.type = 批次)。只有当我在代码中将 setBatchListener 显式设置为 true 时,它才会被调用。这是我的代码和配置。
- 消费者代码:
@KafkaListener(containerFactory = "kafkaListenerContainerFactory",
topics = "${spring.kafka.template.default-topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void receive(List<ConsumerRecord<String,byte[]>> consumerRecords,Acknowledgment acknowledgment){
processor.process(consumerRecords,acknowledgment);
}
- application.yml:
listener:
missing-topics-fatal: false
type: batch
ack-mode: manual
消费者配置:
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
factory.setErrorHandler(new SeekToCurrentErrorHandler( new UpdateMessageErrorHandler(),new FixedBackOff(idleEventInterval,maxFailures)));
final ContainerProperties properties = factory.getContainerProperties();
properties.setIdleBetweenPolls(idleBetweenPolls);
properties.setIdleEventInterval(idleEventInterval);
return factory;
}
如果我没记错的话,通过在您的配置中使用 ConcurrentKafkaListenerContainerFactory
构建器,您实质上是在重写通常在 ConcurrentKafkaListenerContainerFactoryConfigurer
class 内执行的一段代码 spring 自动配置包:
if (properties.getType().equals(Type.BATCH)) {
factory.setBatchListener(true);
factory.setBatchErrorHandler(this.batchErrorHandler);
} else {
factory.setErrorHandler(this.errorHandler);
}
既然它是硬编码在您的 application.yaml
文件中,为什么在您的 @Configuration
文件中配置它是一件坏事?
尝试在 application.yml 中使用侦听器配置属性时,我遇到了一个问题,如果我使用 application.yml 配置(listener.type = 批次)。只有当我在代码中将 setBatchListener 显式设置为 true 时,它才会被调用。这是我的代码和配置。
- 消费者代码:
@KafkaListener(containerFactory = "kafkaListenerContainerFactory",
topics = "${spring.kafka.template.default-topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void receive(List<ConsumerRecord<String,byte[]>> consumerRecords,Acknowledgment acknowledgment){
processor.process(consumerRecords,acknowledgment);
}
- application.yml:
listener:
missing-topics-fatal: false
type: batch
ack-mode: manual
消费者配置:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory( new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties())); factory.setErrorHandler(new SeekToCurrentErrorHandler( new UpdateMessageErrorHandler(),new FixedBackOff(idleEventInterval,maxFailures))); final ContainerProperties properties = factory.getContainerProperties(); properties.setIdleBetweenPolls(idleBetweenPolls); properties.setIdleEventInterval(idleEventInterval); return factory;
}
如果我没记错的话,通过在您的配置中使用 ConcurrentKafkaListenerContainerFactory
构建器,您实质上是在重写通常在 ConcurrentKafkaListenerContainerFactoryConfigurer
class 内执行的一段代码 spring 自动配置包:
if (properties.getType().equals(Type.BATCH)) {
factory.setBatchListener(true);
factory.setBatchErrorHandler(this.batchErrorHandler);
} else {
factory.setErrorHandler(this.errorHandler);
}
既然它是硬编码在您的 application.yaml
文件中,为什么在您的 @Configuration
文件中配置它是一件坏事?