在 spring 中从多线程 kafka 侦听器中消费一个主题
Consume onetopic from multithread kafkalisteners in spring
嗨,我有一个主题,我在 spring 中借助@KafkaListener 注释使用了这个主题的消息
我想从多线程侦听器中使用主题,我该如何实现?
ConcurrentKafkaListenerContainerFactory中有一个属性设置并发,但是这种方法正确吗?
ConcurrentKafkaListenerContainerFactory.setConcurrency(3),一个监听器和三个不同的线程来消费一个主题?
是的,这将创建 3 个线程,但主题至少需要 3 个分区;您不能从单个分区消费多个消费者。
如果您有多个侦听器,则将并发添加为 @KafkaListener
的 属性。例如,
@KafkaListener(id = "consumer1", topics = {"topic1","topic2"}, groupId = "group1", concurrency = "4")
public void consumeLog1(){
}
@KafkaListener(id = "consumer2", topics = {"topic2","topic3"}, groupId = "group1", concurrency = "5")
public void consumeLog2(){
}
consumer1
将创建 4 个 KafkaConsumers
,consumer2
将创建 5 个 KafkaConsumers
。他们将听取分配给他们的主题。
如果您只有一个 @KafkaListener
,则将默认并发添加为 属性。
spring.kafka.listener.concurrency=3
此值将被 @KafkaListener
中的并发值覆盖。
嗨,我有一个主题,我在 spring 中借助@KafkaListener 注释使用了这个主题的消息 我想从多线程侦听器中使用主题,我该如何实现?
ConcurrentKafkaListenerContainerFactory中有一个属性设置并发,但是这种方法正确吗? ConcurrentKafkaListenerContainerFactory.setConcurrency(3),一个监听器和三个不同的线程来消费一个主题?
是的,这将创建 3 个线程,但主题至少需要 3 个分区;您不能从单个分区消费多个消费者。
如果您有多个侦听器,则将并发添加为 @KafkaListener
的 属性。例如,
@KafkaListener(id = "consumer1", topics = {"topic1","topic2"}, groupId = "group1", concurrency = "4")
public void consumeLog1(){
}
@KafkaListener(id = "consumer2", topics = {"topic2","topic3"}, groupId = "group1", concurrency = "5")
public void consumeLog2(){
}
consumer1
将创建 4 个 KafkaConsumers
,consumer2
将创建 5 个 KafkaConsumers
。他们将听取分配给他们的主题。
如果您只有一个 @KafkaListener
,则将默认并发添加为 属性。
spring.kafka.listener.concurrency=3
此值将被 @KafkaListener
中的并发值覆盖。