Spring Kafka - 在运行时订阅新主题
Spring Kafka - Subscribe new topics during runtime
我正在使用注释 @KafkaListener
在我的应用程序中使用主题。我的问题是,如果我在 kafka 中创建一个新主题,但我的消费者已经 运行,似乎消费者不会选择新主题,即使它与我正在使用的 topicPattern
相匹配.有没有办法定期 "refresh" 订阅的主题,以便选择新主题并重新平衡我的 运行 消费者?
我正在使用 Spring Kafka 1.2.2 和 Kafka 0.10.2.0。
此致
您不能在运行时动态添加主题;您必须 stop/start 容器才能开始收听新主题。
您可以 @Autowire
KafkaListenerEndpointRegistry
和 stop/start 听众 id
。
您还可以通过在注册表本身上调用 stop()
/start()
来 stop/start 所有侦听器。
下面的方法很适合我。
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
}
});
container.setBeanName("testAuto");
container.start();
参考:http://docs.spring.io/spring-kafka/docs/1.0.0.RC1/reference/htmlsingle/
在实际应用中,我用的是ConcurrentMessageListenerContainer
而不是单线程的KafkaMessageListenerContainer
。
其实是可以的。
它适用于 Kafka 1.1.1。
在引擎盖下 Spring 使用 consumer.subscribe(topicPattern)
现在完全取决于 Kafka 库是否会被消费者看到消息。
有一个名为 metadata.max.age.ms
的消费者配置 属性,默认为 5 分钟。它基本上控制了客户端去代理获取更新的频率,这意味着消费者最多 5 分钟看不到新主题。您可以减小此值(例如 20 秒),并且应该会看到 KafkaListener 开始更快地从新主题中挑选消息。
我正在使用注释 @KafkaListener
在我的应用程序中使用主题。我的问题是,如果我在 kafka 中创建一个新主题,但我的消费者已经 运行,似乎消费者不会选择新主题,即使它与我正在使用的 topicPattern
相匹配.有没有办法定期 "refresh" 订阅的主题,以便选择新主题并重新平衡我的 运行 消费者?
我正在使用 Spring Kafka 1.2.2 和 Kafka 0.10.2.0。
此致
您不能在运行时动态添加主题;您必须 stop/start 容器才能开始收听新主题。
您可以 @Autowire
KafkaListenerEndpointRegistry
和 stop/start 听众 id
。
您还可以通过在注册表本身上调用 stop()
/start()
来 stop/start 所有侦听器。
下面的方法很适合我。
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
}
});
container.setBeanName("testAuto");
container.start();
参考:http://docs.spring.io/spring-kafka/docs/1.0.0.RC1/reference/htmlsingle/
在实际应用中,我用的是ConcurrentMessageListenerContainer
而不是单线程的KafkaMessageListenerContainer
。
其实是可以的。
它适用于 Kafka 1.1.1。
在引擎盖下 Spring 使用 consumer.subscribe(topicPattern)
现在完全取决于 Kafka 库是否会被消费者看到消息。
有一个名为 metadata.max.age.ms
的消费者配置 属性,默认为 5 分钟。它基本上控制了客户端去代理获取更新的频率,这意味着消费者最多 5 分钟看不到新主题。您可以减小此值(例如 20 秒),并且应该会看到 KafkaListener 开始更快地从新主题中挑选消息。