如何手动启动 Kafka 监听器?

How to start Kafka listener manually?

我有一些方法用 @KafkaListener 注释,但我只想手动启动其中一些(取决于某些条件)。

@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
    // consume
}
@PostConstruct
private void startConsumers() {
    if (true) {
        kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
    }
}

但此时 kafkaListenerEndpointRegistry.getListenerContainers() 是空列表并且 kafkaListenerEndpointRegistry.getListenerContainer("consumer1") returns null。所以也许 @PostConstruct 方法被调用的时间太早了,监听器还没有注册。 我尝试用 @Scheduled(fixedDelay = 100) 注释 startConsumers() 方法,并且侦听器已经可用。但是对于我想在启动应用程序后调用一次的东西,使用 @Scheduled 不是一个好的决定。

您不能在 @PostConstruct 中执行此操作 - 在应用程序上下文生命周期中还为时过早。

实施 SmartLifecyle 将阶段设置为 Integer.MAX_VALUE 并在 start() 方法中启动容器。

或使用 @EventListener 并侦听 ApplicationStartedEvent(如果使用 Spring 启动)或 ContextRefreshedEvent 用于非启动 Spring 应用程序。