我们可以为不同的主题使用相同的容器吗?

can we use same container for different topics?

我有两个不同的 Kafka 主题。两个主题中的数据类型和数据结构 (Json) 相同。 我可以为两个主题使用相同的侦听器容器吗? 我尝试使用并且能够从两个主题中获取输出。

以下是我的配置。

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigure,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factoryConfigure.configure(factory, kafkaConsumerFactory);
    factory.setBatchListener(true);
    return factory;
}

这是我的听众。


@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", topics = "#{'${spring.kafka.consumer.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from topic1");
    System.out.println(model);

}

@KafkaListener(id = "#{'${spring.kafka.listener.id2}'}", topics = "#{'${spring.kafka.consumer.topic2}'}", groupId = "#{'${spring.kafka.consumer.group-id2}'}")
public void getTopics( List<Request> model) {

System.out.println("I am from topic2");
System.out.println(model);

}

我没有明确使用任何注释来提供容器名称。 这会导致我遇到任何问题吗?

不清楚你问的是什么;容器工厂将为每个侦听器创建一个不同的容器。

topics 属性 是一个数组所以你可以使用

@KafkaListener(id = "#{'${spring.kafka.listener.id}'}", 
    topics = { "${spring.kafka.consumer.topic}",
               "${spring.kafka.consumer.topic2}" },
    groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void getTopics( List<Request> model) {

    System.out.println("I am from both topics");
    System.out.println(model);

}

如果您需要知道每条记录来自哪个主题,可以使用

public void getTopics( List<Request> model,
     @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics) {

索引 n 的主题用于模型索引 n 的记录。