Spring Kafka 的集成属性

Spring integration properties for Kafka

尝试在 application.yml 中使用侦听器配置属性时,我遇到了一个问题,如果我使用 application.yml 配置(listener.type = 批次)。只有当我在代码中将 setBatchListener 显式设置为 true 时,它​​才会被调用。这是我的代码和配置。

  1. 消费者代码:
@KafkaListener(containerFactory = "kafkaListenerContainerFactory",
        topics = "${spring.kafka.template.default-topic}",
        groupId = "${spring.kafka.consumer.group-id}")
    public void receive(List<ConsumerRecord<String,byte[]>> consumerRecords,Acknowledgment acknowledgment){
        processor.process(consumerRecords,acknowledgment);
    }
  1. application.yml:
    listener:
      missing-topics-fatal: false
      type: batch
      ack-mode: manual
  1. 消费者配置:

     ConcurrentKafkaListenerContainerFactory<String, String> factory =
             new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(
             new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
     factory.setErrorHandler(new SeekToCurrentErrorHandler( new UpdateMessageErrorHandler(),new FixedBackOff(idleEventInterval,maxFailures)));
     final ContainerProperties properties = factory.getContainerProperties();
     properties.setIdleBetweenPolls(idleBetweenPolls);
     properties.setIdleEventInterval(idleEventInterval);
     return factory;
    

    }

如果我没记错的话,通过在您的配置中使用 ConcurrentKafkaListenerContainerFactory 构建器,您实质上是在重写通常在 ConcurrentKafkaListenerContainerFactoryConfigurer class 内执行的一段代码 spring 自动配置包:

        if (properties.getType().equals(Type.BATCH)) {
            factory.setBatchListener(true);
            factory.setBatchErrorHandler(this.batchErrorHandler);
        } else {
            factory.setErrorHandler(this.errorHandler);
        }

既然它是硬编码在您的 application.yaml 文件中,为什么在您的 @Configuration 文件中配置它是一件坏事?