当并发设置大于1时,如何暂停特定的kafka消费者线程?

How to pause a specific kafka consumer thread when concurrency is set to more than 1?

我正在使用 spring-kafka 2.2.8 并将并发设置为 2,如下所示,并试图了解如何在满足特定条件时暂停消费者 thread/instance。

@KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2)
    public void listen(String in) {
        System.out.println(in);
    }

现在,我有两个问题。

  1. 我的消费者是否会跨越两个不同的轮询线程来轮询记录?

  2. 如果我正在为消费者设置一个 id,如上所示。我如何暂停特定的消费者线程(并发设置为大于 1)。

请推荐。

使用 KafkaListenerEndpointRegistry.getListenerContainer(id) 方法获取对容器的引用。

将其转换为 ConcurrentMessageListenerContainer 并调用 getContainers() 以获取子 KafkaMessageListenerContainer 的列表;然后你可以 pause/resume 他们单独。

您可以使用 getAssignedPartitions().

来确定每个 topics/partitions