当并发设置大于1时,如何暂停特定的kafka消费者线程?
How to pause a specific kafka consumer thread when concurrency is set to more than 1?
我正在使用 spring-kafka 2.2.8 并将并发设置为 2,如下所示,并试图了解如何在满足特定条件时暂停消费者 thread/instance。
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
public void listen(String in) {
System.out.println(in);
}
现在,我有两个问题。
我的消费者是否会跨越两个不同的轮询线程来轮询记录?
如果我正在为消费者设置一个 id,如上所示。我如何暂停特定的消费者线程(并发设置为大于 1)。
请推荐。
使用 KafkaListenerEndpointRegistry.getListenerContainer(id)
方法获取对容器的引用。
将其转换为 ConcurrentMessageListenerContainer
并调用 getContainers()
以获取子 KafkaMessageListenerContainer
的列表;然后你可以 pause/resume 他们单独。
您可以使用 getAssignedPartitions()
.
来确定每个 topics/partitions
我正在使用 spring-kafka 2.2.8 并将并发设置为 2,如下所示,并试图了解如何在满足特定条件时暂停消费者 thread/instance。
@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
public void listen(String in) {
System.out.println(in);
}
现在,我有两个问题。
我的消费者是否会跨越两个不同的轮询线程来轮询记录?
如果我正在为消费者设置一个 id,如上所示。我如何暂停特定的消费者线程(并发设置为大于 1)。
请推荐。
使用 KafkaListenerEndpointRegistry.getListenerContainer(id)
方法获取对容器的引用。
将其转换为 ConcurrentMessageListenerContainer
并调用 getContainers()
以获取子 KafkaMessageListenerContainer
的列表;然后你可以 pause/resume 他们单独。
您可以使用 getAssignedPartitions()
.