如何手动启动 Kafka 监听器?
How to start Kafka listener manually?
我有一些方法用 @KafkaListener
注释,但我只想手动启动其中一些(取决于某些条件)。
@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
// consume
}
@PostConstruct
private void startConsumers() {
if (true) {
kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
}
}
但此时 kafkaListenerEndpointRegistry.getListenerContainers()
是空列表并且 kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
returns null
。所以也许 @PostConstruct
方法被调用的时间太早了,监听器还没有注册。
我尝试用 @Scheduled(fixedDelay = 100)
注释 startConsumers()
方法,并且侦听器已经可用。但是对于我想在启动应用程序后调用一次的东西,使用 @Scheduled
不是一个好的决定。
您不能在 @PostConstruct
中执行此操作 - 在应用程序上下文生命周期中还为时过早。
实施 SmartLifecyle
将阶段设置为 Integer.MAX_VALUE
并在 start()
方法中启动容器。
或使用 @EventListener
并侦听 ApplicationStartedEvent
(如果使用 Spring 启动)或 ContextRefreshedEvent
用于非启动 Spring 应用程序。
我有一些方法用 @KafkaListener
注释,但我只想手动启动其中一些(取决于某些条件)。
@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
// consume
}
@PostConstruct
private void startConsumers() {
if (true) {
kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
}
}
但此时 kafkaListenerEndpointRegistry.getListenerContainers()
是空列表并且 kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
returns null
。所以也许 @PostConstruct
方法被调用的时间太早了,监听器还没有注册。
我尝试用 @Scheduled(fixedDelay = 100)
注释 startConsumers()
方法,并且侦听器已经可用。但是对于我想在启动应用程序后调用一次的东西,使用 @Scheduled
不是一个好的决定。
您不能在 @PostConstruct
中执行此操作 - 在应用程序上下文生命周期中还为时过早。
实施 SmartLifecyle
将阶段设置为 Integer.MAX_VALUE
并在 start()
方法中启动容器。
或使用 @EventListener
并侦听 ApplicationStartedEvent
(如果使用 Spring 启动)或 ContextRefreshedEvent
用于非启动 Spring 应用程序。