消费者如何绑定到指定的线程

How consumers bind to specified threads

spring-卡夫卡version:2.3.1

application.yml

spring:
  kafka:
    listener:
      concurrency: 3
      type: batch

在批处理消息中,希望能够绑定线程来处理不同的分区。

@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"0"}))
public void listenTopicPartition(ConsumerRecord record) {
    System.out.println(record);
}

@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"1"}))
public void listenTopicPartition1(ConsumerRecord record) {
    System.out.println(record);
}

@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"2"}))
public void listenTopicPartition2(ConsumerRecord record) {
    System.out.println(record);
}

根据你现在的配置,真的如你所愿。

你有什么问题吗?是什么让你带着这个问题来找我们? 我会像你一样做,并在测试期间检查所有这些分区是否真的在它们自己的线程中处理。

如果您的配置中有这样的逻辑,您就不需要 concurrency: 3:每个 @KafkaListener 都提供自己的容器,并发性根据分配给该容器的分区进行分配。由于每个人都将只有一个分区,因此运行时并发性始终为 1.