消费者如何绑定到指定的线程
How consumers bind to specified threads
spring-卡夫卡version:2.3.1
application.yml
spring:
kafka:
listener:
concurrency: 3
type: batch
在批处理消息中,希望能够绑定线程来处理不同的分区。
@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"0"}))
public void listenTopicPartition(ConsumerRecord record) {
System.out.println(record);
}
@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"1"}))
public void listenTopicPartition1(ConsumerRecord record) {
System.out.println(record);
}
@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"2"}))
public void listenTopicPartition2(ConsumerRecord record) {
System.out.println(record);
}
根据你现在的配置,真的如你所愿。
你有什么问题吗?是什么让你带着这个问题来找我们?
我会像你一样做,并在测试期间检查所有这些分区是否真的在它们自己的线程中处理。
如果您的配置中有这样的逻辑,您就不需要 concurrency: 3
:每个 @KafkaListener
都提供自己的容器,并发性根据分配给该容器的分区进行分配。由于每个人都将只有一个分区,因此运行时并发性始终为 1
.
spring-卡夫卡version:2.3.1
application.yml
spring:
kafka:
listener:
concurrency: 3
type: batch
在批处理消息中,希望能够绑定线程来处理不同的分区。
@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"0"}))
public void listenTopicPartition(ConsumerRecord record) {
System.out.println(record);
}
@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"1"}))
public void listenTopicPartition1(ConsumerRecord record) {
System.out.println(record);
}
@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"2"}))
public void listenTopicPartition2(ConsumerRecord record) {
System.out.println(record);
}
根据你现在的配置,真的如你所愿。
你有什么问题吗?是什么让你带着这个问题来找我们? 我会像你一样做,并在测试期间检查所有这些分区是否真的在它们自己的线程中处理。
如果您的配置中有这样的逻辑,您就不需要 concurrency: 3
:每个 @KafkaListener
都提供自己的容器,并发性根据分配给该容器的分区进行分配。由于每个人都将只有一个分区,因此运行时并发性始终为 1
.