Spring Kafka:按顺序阅读两个不同的主题
Spring Kafka: Read from two different topics in order
使用 Spring Kafka 是否可以按保证的顺序读取不同消费者的两个不同主题?例如,主题 A 存储与确定如何处理存储在主题 B 中的数据相关的信息。主题 A 被读入内存并被引用,但在从主题 B 读取数据之前需要完全填充。下面是一个我当前设置的示例...
@Service
public class TopicA {
@KafkaListener(topics = "topicA")
public void consume(ConsumerRecord<String, byte[]> record) {
// ... some code here to populate an in-memory data structure
}
}
@Service
public class TopicB {
@KafkaListener(topics = "topicB")
public void consume(ConsumerRecord<String, byte[]> record) {
// ... some code here that depends on topic A having populated the in-memory data structure
}
}
到目前为止,我一直倾向于创建一个 Spring 启动过程(使用 @PostConstruct),该过程通过首先从主题 A 读取来初始化数据结构,但一直无法使其正常工作。有没有人有什么建议?提前致谢!
@KafkaListener(id = "bConsumer" topics = "topicB", autoStartup = "false")
然后在您准备好后自动装配 KafkaListenerEndpointRegistry
bean 和 registry.getListenerContainer("bConsumer").start();
。
使用 Spring Kafka 是否可以按保证的顺序读取不同消费者的两个不同主题?例如,主题 A 存储与确定如何处理存储在主题 B 中的数据相关的信息。主题 A 被读入内存并被引用,但在从主题 B 读取数据之前需要完全填充。下面是一个我当前设置的示例...
@Service
public class TopicA {
@KafkaListener(topics = "topicA")
public void consume(ConsumerRecord<String, byte[]> record) {
// ... some code here to populate an in-memory data structure
}
}
@Service
public class TopicB {
@KafkaListener(topics = "topicB")
public void consume(ConsumerRecord<String, byte[]> record) {
// ... some code here that depends on topic A having populated the in-memory data structure
}
}
到目前为止,我一直倾向于创建一个 Spring 启动过程(使用 @PostConstruct),该过程通过首先从主题 A 读取来初始化数据结构,但一直无法使其正常工作。有没有人有什么建议?提前致谢!
@KafkaListener(id = "bConsumer" topics = "topicB", autoStartup = "false")
然后在您准备好后自动装配 KafkaListenerEndpointRegistry
bean 和 registry.getListenerContainer("bConsumer").start();
。