Spring Kafka MessageListenerContainer Resume/Pause # spring kafka
Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka
由于原生 KafkaConsumer 不是线程安全的,因此不鼓励从不同的线程而不是 kafka-consumer 处理线程调用暂停和恢复方法。
但由于 spring-kafka 提供了另一层 KafkaMessageListenerContainer,它在内部使用 kafka-consumer。所以我的问题是我们可以使用 KafkaListenerEndpointRegistry 通过 id 获取侦听器容器并从其他线程而不是消费者处理线程调用 resume 或 pause 方法。
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = newFixedThreadPool(2);
executorService.submit(()->{
System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
是; container.pause()
设置一个标志,告诉 Consumer
线程在下一次 poll()
调用之前暂停。同样,resume()
重置标志,因此消费者线程将在下一次轮询之前恢复 Consumer
。
由于原生 KafkaConsumer 不是线程安全的,因此不鼓励从不同的线程而不是 kafka-consumer 处理线程调用暂停和恢复方法。 但由于 spring-kafka 提供了另一层 KafkaMessageListenerContainer,它在内部使用 kafka-consumer。所以我的问题是我们可以使用 KafkaListenerEndpointRegistry 通过 id 获取侦听器容器并从其他线程而不是消费者处理线程调用 resume 或 pause 方法。
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = newFixedThreadPool(2);
executorService.submit(()->{
System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
是; container.pause()
设置一个标志,告诉 Consumer
线程在下一次 poll()
调用之前暂停。同样,resume()
重置标志,因此消费者线程将在下一次轮询之前恢复 Consumer
。