如何使用 spring 引导在一个消费者 class 中顺序读取 2 个 Kafka 主题?
How to read 2 Kafka topics sequentially in one consumer class using spring boot?
我有 2 个 Kafka 主题 A 和 B。我希望每次当我的消费者启动时,它首先进入主题 A。主题 A 包含有关主题 B 的信息,我将根据该信息从主题 B 获取数据。
所以基本上我需要先阅读主题 A,然后再转到主题 B,我只需要在每次重新启动我的程序时这样做一次。
我能想到的事情:
@KafkaListener(topics = {"A" , "B"})
或:
@KafkaListener(topics = "A")
public void receive() {}
@KafkaListener(topics = "B")
public void receive() {}
两者都不保证读取顺序
如何强制我的程序先阅读主题 A,并在完成主题 A 的最新更新后才转到主题 B?
使用类似这样的东西...
@KafkaListener(id = "bReceiver", autoStartup = "false, topics = "B")
public void receive() {}
设置idleEventInterval
并添加
@Autowired
KafkaListenerEndpointRegistry registry;
@EventListener(condition = "event.listenerId == 'aReceiver`)
public void eventListener(ListenerContainerIdleEvent event) {
this.registry.getListenerContainer("aReceiver").stop(() -> { });
this.registry.getListenerContainer("bReceiver").start();
}
我有 2 个 Kafka 主题 A 和 B。我希望每次当我的消费者启动时,它首先进入主题 A。主题 A 包含有关主题 B 的信息,我将根据该信息从主题 B 获取数据。
所以基本上我需要先阅读主题 A,然后再转到主题 B,我只需要在每次重新启动我的程序时这样做一次。
我能想到的事情:
@KafkaListener(topics = {"A" , "B"})
或:
@KafkaListener(topics = "A")
public void receive() {}
@KafkaListener(topics = "B")
public void receive() {}
两者都不保证读取顺序
如何强制我的程序先阅读主题 A,并在完成主题 A 的最新更新后才转到主题 B?
使用类似这样的东西...
@KafkaListener(id = "bReceiver", autoStartup = "false, topics = "B")
public void receive() {}
设置idleEventInterval
并添加
@Autowired
KafkaListenerEndpointRegistry registry;
@EventListener(condition = "event.listenerId == 'aReceiver`)
public void eventListener(ListenerContainerIdleEvent event) {
this.registry.getListenerContainer("aReceiver").stop(() -> { });
this.registry.getListenerContainer("bReceiver").start();
}