spring-kafka KafkaListener 中的并行处理和自动缩放
Parallel processing and auto scaling in spring-kafka KafkaListener
我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,它们发送的消息格式与下面相同。
@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
- KafkaListener是否可以根据自己监听的topic个数来分配consumer threads个数,并行处理两个topic中的消息?或者它不支持并行处理并且消息必须在主题中等待直到一条消息被处理?
- 如果主题中的消息数量较多,我需要自动缩放我的微服务以启动新实例(直到分区数量)。从 KafkaListener 的角度来看,我可以依靠哪些参数(CPU、内存)来找出主题中的消息数量更高? (即在 API 中,我可以通过监视 HTTP 延迟来自动扩展服务)
您可以将 concurrency
属性 设置为 运行 更多线程;但每个分区只能由一个线程处理。要增加并发性,您必须增加每个主题中的分区数。在同一个监听器中监听多个主题时,如果这些主题只有一个分区,除非您更改 kafka 消费者分区分配器,否则您可能无法获得您想要的并发。
When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. ...
我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,它们发送的消息格式与下面相同。
@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
- KafkaListener是否可以根据自己监听的topic个数来分配consumer threads个数,并行处理两个topic中的消息?或者它不支持并行处理并且消息必须在主题中等待直到一条消息被处理?
- 如果主题中的消息数量较多,我需要自动缩放我的微服务以启动新实例(直到分区数量)。从 KafkaListener 的角度来看,我可以依靠哪些参数(CPU、内存)来找出主题中的消息数量更高? (即在 API 中,我可以通过监视 HTTP 延迟来自动扩展服务)
您可以将 concurrency
属性 设置为 运行 更多线程;但每个分区只能由一个线程处理。要增加并发性,您必须增加每个主题中的分区数。在同一个监听器中监听多个主题时,如果这些主题只有一个分区,除非您更改 kafka 消费者分区分配器,否则您可能无法获得您想要的并发。
When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. ...