从多个 Kafka 主题中同时读取 1 条消息
Read 1 message concurrently from multiple Kafka topics
我将 Kafka Listener 的并发设置为 1。
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
我正在听 3 个不同的话题
@KafkaListener(topics = "#{'${kafka.consumer.topic.name}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload Map<String, Object> conciseMap,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset,
Acknowledgment ack) {
processMessage(conciseMap,partition,offset,ack,false);
}
在这种情况下,侦听器是否会从第一个主题读取一条消息,并在处理完后从下一个主题读取一条消息,依此类推?或者它会同时处理来自每个主题的 1 条消息。
如果是前者,有没有办法在不创建多个监听器的情况下同时从所有主题中读取 1 条消息?
您不需要创建多个侦听器 - 您只需要与您在所有主题中提供的分区一样大的并发性,甚至更多。
将会有如此多的 KafkaMessageListenerContainer
旋转,并且每个都将在自己的线程中工作。您仍然可以使用相同的 @KafkaListener
方法。只要你在那里是无状态的,你就没有任何并发问题。
没有 gua运行知道 Kafka 代理将如何跨容器线程分配分区;如果你只有一个分区;它们可能都会分配给同一个容器线程。这就是我 运行 容器并发=3 的测试时发生的事情...
2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2017-10-31 16:40:26.079 INFO 35202 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[bar-0, foo-0, baz-0]
每个主题有 10 个分区,我得到了这个分布...
2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[foo10-5, foo10-6, foo10-4, baz10-5, baz10-4, baz10-6, bar10-5, bar10-4, bar10-6]
2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[bar10-1, bar10-0, bar10-3, bar10-2, baz10-1, baz10-0, baz10-3, baz10-2, foo10-3, foo10-1, foo10-2, foo10-0]
2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[baz10-9, baz10-8, baz10-7, bar10-9, bar10-8, foo10-9, bar10-7, foo10-7, foo10-8]
如您所见,每个主题的一些分区已分配给每个线程。但是其中两个线程总共有 9 个分区,而一个线程有 12 个。
如果您想要完全控制,我会建议每个主题都有一个侦听器。
我将 Kafka Listener 的并发设置为 1。
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
我正在听 3 个不同的话题
@KafkaListener(topics = "#{'${kafka.consumer.topic.name}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload Map<String, Object> conciseMap,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset,
Acknowledgment ack) {
processMessage(conciseMap,partition,offset,ack,false);
}
在这种情况下,侦听器是否会从第一个主题读取一条消息,并在处理完后从下一个主题读取一条消息,依此类推?或者它会同时处理来自每个主题的 1 条消息。
如果是前者,有没有办法在不创建多个监听器的情况下同时从所有主题中读取 1 条消息?
您不需要创建多个侦听器 - 您只需要与您在所有主题中提供的分区一样大的并发性,甚至更多。
将会有如此多的 KafkaMessageListenerContainer
旋转,并且每个都将在自己的线程中工作。您仍然可以使用相同的 @KafkaListener
方法。只要你在那里是无状态的,你就没有任何并发问题。
没有 gua运行知道 Kafka 代理将如何跨容器线程分配分区;如果你只有一个分区;它们可能都会分配给同一个容器线程。这就是我 运行 容器并发=3 的测试时发生的事情...
2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2017-10-31 16:40:26.079 INFO 35202 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[bar-0, foo-0, baz-0]
每个主题有 10 个分区,我得到了这个分布...
2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[foo10-5, foo10-6, foo10-4, baz10-5, baz10-4, baz10-6, bar10-5, bar10-4, bar10-6]
2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[bar10-1, bar10-0, bar10-3, bar10-2, baz10-1, baz10-0, baz10-3, baz10-2, foo10-3, foo10-1, foo10-2, foo10-0]
2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[baz10-9, baz10-8, baz10-7, bar10-9, bar10-8, foo10-9, bar10-7, foo10-7, foo10-8]
如您所见,每个主题的一些分区已分配给每个线程。但是其中两个线程总共有 9 个分区,而一个线程有 12 个。
如果您想要完全控制,我会建议每个主题都有一个侦听器。