消费者线程数动态适应Kafka分区数
Dynamically adapt the number of consumer thread to the number of Kafka partitions
我有一个包含 50 个分区的 Kafka 主题。
我的 Spring 启动应用程序使用 Spring Kafka 以 @KafkaListener
读取这些消息
我的应用程序在我的 Kubernetes 中自动缩放的实例数。
默认情况下,似乎 Spring Kafka 每个主题启动 1 个消费者线程。
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
因此,对于一个唯一的应用程序实例,一个线程正在读取 50 个分区。
有 2 个实例,有一个负载平衡,每个实例侦听 25 个分区。每个实例仍然有 1 个线程。
我知道我可以使用 @KafkaListener
上的 concurrency
参数设置线程数。
但这是一个固定值。
有什么方法可以告诉 Spring 将消费者线程数动态调整为客户端当前正在侦听的分区数?
我认为可能有更好的方法来解决这个问题。
您应该通过负载/性能测试弄清楚应用程序的一个实例可以并行处理多少条记录/分区。
假设一个实例可以最佳地并行处理 10 个线程/记录。现在,如果您将应用程序扩展到 50 个实例,在您的方法中,每个实例将获得一个分区,并且每个实例的性能都将低于其容量,从而浪费资源。
现在考虑相反的情况 - 只剩下一个实例,它会生成 50 个线程以并行使用所有分区。应用程序的性能会有所下降,它可能会变得无响应甚至崩溃。
因此,在这种假设情况下,您可能想要做的是,例如,从一个或两个实例开始处理所有分区,每个分区有 10 个线程,如果存在消费者延迟,则将其扩展到最多 5 个实例, 这样每个分区都有专门的线程处理它。
同样,实际数字应通过负载/性能测试确定。
我有一个包含 50 个分区的 Kafka 主题。
我的 Spring 启动应用程序使用 Spring Kafka 以 @KafkaListener
我的应用程序在我的 Kubernetes 中自动缩放的实例数。
默认情况下,似乎 Spring Kafka 每个主题启动 1 个消费者线程。
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
因此,对于一个唯一的应用程序实例,一个线程正在读取 50 个分区。
有 2 个实例,有一个负载平衡,每个实例侦听 25 个分区。每个实例仍然有 1 个线程。
我知道我可以使用 @KafkaListener
上的 concurrency
参数设置线程数。
但这是一个固定值。
有什么方法可以告诉 Spring 将消费者线程数动态调整为客户端当前正在侦听的分区数?
我认为可能有更好的方法来解决这个问题。
您应该通过负载/性能测试弄清楚应用程序的一个实例可以并行处理多少条记录/分区。
假设一个实例可以最佳地并行处理 10 个线程/记录。现在,如果您将应用程序扩展到 50 个实例,在您的方法中,每个实例将获得一个分区,并且每个实例的性能都将低于其容量,从而浪费资源。
现在考虑相反的情况 - 只剩下一个实例,它会生成 50 个线程以并行使用所有分区。应用程序的性能会有所下降,它可能会变得无响应甚至崩溃。
因此,在这种假设情况下,您可能想要做的是,例如,从一个或两个实例开始处理所有分区,每个分区有 10 个线程,如果存在消费者延迟,则将其扩展到最多 5 个实例, 这样每个分区都有专门的线程处理它。
同样,实际数字应通过负载/性能测试确定。