Spring Kafka 是否需要 kafka 容器工厂?

Is kafka container factory a requirement in Spring Kafka?

我有一个简单的消费者在 Spring 工作。我有一个配置 class 定义了一堆工厂等。当我删除配置 class 时,消费者仍然工作。我想知道拥有工厂的好处,即:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
            GenericRecord> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(retrieveConsumerConfigs());
    }

现在只需通过应用程序属性传递 val 即可。我在基于 class 的方法中明确控制了配置,但我也在想我可以放弃 class 并通过 spring env 变量提供 vals,例如 [=16] =],例如。

@KafkaListener 方法需要容器工厂。

Spring 如果您不提供自己的 bean,引导将自动配置一个(来自应用程序。properties/yml)。参见 KafkaAutoConfiguration

引导还将配置消费者工厂(如果您不这样做)。

应用程序通常不需要声明任何基础结构 bean。

编辑

我宁愿从不声明我自己的基础结构 bean。如果我需要一些未作为引导 属性 公开的功能,或者我想只为一个容器覆盖某些 属性 的地方,我只需添加一个自定义程序 bean。

@Component
class Customizer {

    public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {

        factory.setContainerCustomizer(container -> {
            if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
                container.getContainerProperties().setIdleBetweenPolls(60_000);
            }
        });
    }

}

@Component
class Customizer {

    Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> containerFactory,
            ThreadPoolTaskExecutor exec) {

        containerFactory.getContainerProperties().setConsumerTaskExecutor(exec);
    }

}

等等

Spring 中的简单使用者工作是因为 spring- 引擎盖后面的引导自动配置创建了一个 ConcurrentKafkaListenerContainerFactory 的对象并将其注册到 spring 容器。

您可以通过注入 KafkaListenerContainerFactory 的实现来验证它,如下所示:

@RestController
public class EmployeeController {

    private final KafkaListenerContainerFactory kafkaListenerContainerFactory;

    @Autowired
    public EmployeeController(KafkaListenerContainerFactory kafkaListenerContainerFactory) {
        System.out.println(kafkaListenerContainerFactory instanceof ConcurrentKafkaListenerContainerFactory);
        this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
    }
}

但是如果您对 spring boot 的自动生成的 bean 不满意,您可以创建自己的 bean 并使用 @Bean 注释将其注册到 spring 容器