使用 spring kafka 2.5.8 版本实现每个 kafka 主题分区一个消费者线程
Achieving one consumer thread per kafka topic partition with spring kafka 2.5.8 release
我一直在使用 apache kafka-clients(准确地说是 2.3.1 版本)库来创建 kafka 消费者,其中一个分区 - 一个消费者线程是通过以下计算实现的:
计算上的消费者线程数 * 计算数 = 主题的分区数
它曾经是手动缩放,因此当需要增加计算数量时,一个计算上的消费者线程运行 相应减少。
我们如何使用 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
来实现这一点。
我正在尝试使用 spring kafka 2.5.8 版本。该应用程序在具有自动缩放功能的 k8s 集群上 运行。假设我将 max 和 min pods 设置为 4,那么理想情况下
4 X 消费者线程数 = 主题的分区数
这个消费者线程数是如何配置的。是不是通过这个:
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.setConcurrency()
。有人可以指导一下吗
是;或 @KafkaListener
上的 concurrency
属性 覆盖工厂的并发性。
如果您在运行时更改它,除非您 stop()
和 start()
容器,否则它不会生效。
我一直在使用 apache kafka-clients(准确地说是 2.3.1 版本)库来创建 kafka 消费者,其中一个分区 - 一个消费者线程是通过以下计算实现的:
计算上的消费者线程数 * 计算数 = 主题的分区数
它曾经是手动缩放,因此当需要增加计算数量时,一个计算上的消费者线程运行 相应减少。
我们如何使用 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
来实现这一点。
我正在尝试使用 spring kafka 2.5.8 版本。该应用程序在具有自动缩放功能的 k8s 集群上 运行。假设我将 max 和 min pods 设置为 4,那么理想情况下
4 X 消费者线程数 = 主题的分区数
这个消费者线程数是如何配置的。是不是通过这个:
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.setConcurrency()
。有人可以指导一下吗
是;或 @KafkaListener
上的 concurrency
属性 覆盖工厂的并发性。
如果您在运行时更改它,除非您 stop()
和 start()
容器,否则它不会生效。