Spring Kafka MessageListenerContainer Resume/Pause # spring kafka

Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka

由于原生 KafkaConsumer 不是线程安全的,因此不鼓励从不同的线程而不是 kafka-consumer 处理线程调用暂停和恢复方法。 但由于 spring-kafka 提供了另一层 KafkaMessageListenerContainer,它在内部使用 kafka-consumer。所以我的问题是我们可以使用 KafkaListenerEndpointRegistry 通过 id 获取侦听器容器并从其他线程而不是消费者处理线程调用 resume 或 pause 方法。

kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
       

    ExecutorService executorService  = newFixedThreadPool(2);
    executorService.submit(()->{

        System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
    });

是; container.pause() 设置一个标志,告诉 Consumer 线程在下一次 poll() 调用之前暂停。同样,resume() 重置标志,因此消费者线程将在下一次轮询之前恢复 Consumer