如何部署处于暂停模式的 kafka 消费者,直到我发出开始消费消息的信号
how to deploy a kafka consumer being in pause mode until i signal to start consume the messages
我正在使用 spring-kafka 2.2.8 并试图了解是否可以选择部署处于暂停模式的 kafka 消费者,直到我发出开始消费消息的信号。请提出建议。
我在下面看到 post,我们可以暂停和启动消费者,但我需要消费者在部署时处于暂停模式。
how to pause and resume @KafkaListener using spring-kafka
@KafkaListener(id = "foo", ..., autoStartup = "false")
然后在准备就绪后使用 KafkaListenerEndpointRegistry
启动它
registry.getListenerContainer("foo").start();
以暂停模式启动它没有多大意义,但您可以这样做...
@SpringBootApplication
public class So62329274Application {
public static void main(String[] args) {
SpringApplication.run(So62329274Application.class, args);
}
@KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
return args -> {
template.send("so62329274", "foo");
registry.getListenerContainer("so62329274").pause();
registry.getListenerContainer("so62329274").start();
System.in.read();
registry.getListenerContainer("so62329274").resume();
};
}
}
分配分区后,您将看到这样的日志消息:
Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records
我正在使用 spring-kafka 2.2.8 并试图了解是否可以选择部署处于暂停模式的 kafka 消费者,直到我发出开始消费消息的信号。请提出建议。
我在下面看到 post,我们可以暂停和启动消费者,但我需要消费者在部署时处于暂停模式。 how to pause and resume @KafkaListener using spring-kafka
@KafkaListener(id = "foo", ..., autoStartup = "false")
然后在准备就绪后使用 KafkaListenerEndpointRegistry
启动它
registry.getListenerContainer("foo").start();
以暂停模式启动它没有多大意义,但您可以这样做...
@SpringBootApplication
public class So62329274Application {
public static void main(String[] args) {
SpringApplication.run(So62329274Application.class, args);
}
@KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
return args -> {
template.send("so62329274", "foo");
registry.getListenerContainer("so62329274").pause();
registry.getListenerContainer("so62329274").start();
System.in.read();
registry.getListenerContainer("so62329274").resume();
};
}
}
分配分区后,您将看到这样的日志消息:
Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records