在 spring 中从多线程 kafka 侦听器中消费一个主题

Consume onetopic from multithread kafkalisteners in spring

嗨,我有一个主题,我在 spring 中借助@KafkaListener 注释使用了这个主题的消息 我想从多线程侦听器中使用主题,我该如何实现?

ConcurrentKafkaListenerContainerFactory中有一个属性设置并发,但是这种方法正确吗? ConcurrentKafkaListenerContainerFactory.setConcurrency(3),一个监听器和三个不同的线程来消费一个主题?

是的,这将创建 3 个线程,但主题至少需要 3 个分区;您不能从单个分区消费多个消费者。

如果您有多个侦听器,则将并发添加为 @KafkaListener 的 属性。例如,

    @KafkaListener(id = "consumer1", topics = {"topic1","topic2"}, groupId = "group1", concurrency = "4")
    public void consumeLog1(){
    }

    @KafkaListener(id = "consumer2", topics = {"topic2","topic3"}, groupId = "group1", concurrency = "5")
    public void consumeLog2(){
    }

consumer1 将创建 4 个 KafkaConsumersconsumer2 将创建 5 个 KafkaConsumers。他们将听取分配给他们的主题。

如果您只有一个 @KafkaListener,则将默认并发添加为 属性。

spring.kafka.listener.concurrency=3

此值将被 @KafkaListener 中的并发值覆盖。