使用 spring kafka 2.5.8 版本实现每个 kafka 主题分区一个消费者线程

Achieving one consumer thread per kafka topic partition with spring kafka 2.5.8 release

我一直在使用 apache kafka-clients(准确地说是 2.3.1 版本)库来创建 kafka 消费者,其中一个分区 - 一个消费者线程是通过以下计算实现的:

计算上的消费者线程数 * 计算数 = 主题的分区数

它曾经是手动缩放,因此当需要增加计算数量时,一个计算上的消费者线程运行 相应减少。

我们如何使用 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory 来实现这一点。

我正在尝试使用 spring kafka 2.5.8 版本。该应用程序在具有自动缩放功能的 k8s 集群上 运行。假设我将 max 和 min pods 设置为 4,那么理想情况下

4 X 消费者线程数 = 主题的分区数

这个消费者线程数是如何配置的。是不是通过这个:

org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.setConcurrency()。有人可以指导一下吗

是;或 @KafkaListener 上的 concurrency 属性 覆盖工厂的并发性。

如果您在运行时更改它,除非您 stop()start() 容器,否则它不会生效。