spring-kafka KafkaListener 中的并行处理和自动缩放

Parallel processing and auto scaling in spring-kafka KafkaListener

我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,它们发送的消息格式与下面相同。

    @KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
    public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
        log.info("Message : {}  is received", message);
        ack.acknowledge();
    }

您可以将 concurrency 属性 设置为 运行 更多线程;但每个分区只能由一个线程处理。要增加并发性,您必须增加每个主题中的分区数。在同一个监听器中监听多个主题时,如果这些主题只有一个分区,除非您更改 kafka 消费者分区分配器,否则您可能无法获得您想要的并发。

https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#using-ConcurrentMessageListenerContainer

When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. ...