如何使用 Spring 提供的 Kafka api 在消费组中创建多个消费者
How to create multiple consumers in a consume group using Spring provided Kafka apis
我正在尝试在一个消费者组中创建多个消费者以进行并行处理,因为我们有大量的消息流入。我正在使用 spring 引导和 KafkTemplate。我们如何在 spring 引导应用程序的单个实例中创建属于单个消费者组的多个消费者?
使用@KafkaListener 注释的多个方法是否会创建多个消费者?
你必须使用 ConcurrentMessageListenerContainer
。它委托给一个或多个 KafkaMessageListenerContainer
实例以提供多线程消费。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
factory.setConcurrency(10) 创建 10 个 KafkaMessageListenerContainer
实例。每个实例都有一定数量的分区。这取决于您创建主题时配置的分区数。
一些准备步骤:
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private final static String BOOTSTRAP_ADDRESS = "localhost:9092";
private final static String CONSUMER_GROUP = "consumer-group-1";
private final static String TOPIC = "test-topic";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message) {
logger.info(message);
}
public void start() {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i);
}
logger.info("All message are sent");
}
如果您 运行 上面的方法,您可以看到每个 KafkaMessageListenerContainer
实例处理被放入该实例所服务的分区中的消息。
添加Thread.sleep()等待消费者初始化。
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo : Message 5
2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo : Message 7
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo : Message 8
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo : Message 1
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo : Message 0
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo : Message 9
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo : Message 4
2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo : Message 3
2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo : Message 2
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo : Message 6
是的,@KafkaListener
将为您创建多个消费者。
有了它,您可以将它们全部配置为使用相同的主题并属于同一组。
Kafka 协调器会将分区分配给您的消费者。
虽然如果主题中只有一个分区,并发不会发生:单个分区在单个线程中处理。
另一种选择确实是配置 concurrency
并且将根据 concurrency <-> partition
状态再次创建多个消费者。
正如@Salavat Yalalo 所建议的那样,我将我的 Kafka 容器工厂设为 ConcurrentKafkaListenerContainerFactory
。在@KafkaListenere 方法上,我添加了名为 concurrency 的选项,它接受一个整数作为字符串,表示要跨越的消费者数量,如下所示
@KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values)
public void topicConsumer(Message<MyObject> myObject){
//.....
}
当运行时,我看到在一个消费者组中创建了 4 个消费者。
我正在尝试在一个消费者组中创建多个消费者以进行并行处理,因为我们有大量的消息流入。我正在使用 spring 引导和 KafkTemplate。我们如何在 spring 引导应用程序的单个实例中创建属于单个消费者组的多个消费者? 使用@KafkaListener 注释的多个方法是否会创建多个消费者?
你必须使用 ConcurrentMessageListenerContainer
。它委托给一个或多个 KafkaMessageListenerContainer
实例以提供多线程消费。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
factory.setConcurrency(10) 创建 10 个 KafkaMessageListenerContainer
实例。每个实例都有一定数量的分区。这取决于您创建主题时配置的分区数。
一些准备步骤:
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private final static String BOOTSTRAP_ADDRESS = "localhost:9092";
private final static String CONSUMER_GROUP = "consumer-group-1";
private final static String TOPIC = "test-topic";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message) {
logger.info(message);
}
public void start() {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i);
}
logger.info("All message are sent");
}
如果您 运行 上面的方法,您可以看到每个 KafkaMessageListenerContainer
实例处理被放入该实例所服务的分区中的消息。
添加Thread.sleep()等待消费者初始化。
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo : Message 5
2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo : Message 7
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo : Message 8
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo : Message 1
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo : Message 0
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo : Message 9
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo : Message 4
2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo : Message 3
2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo : Message 2
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo : Message 6
是的,@KafkaListener
将为您创建多个消费者。
有了它,您可以将它们全部配置为使用相同的主题并属于同一组。 Kafka 协调器会将分区分配给您的消费者。
虽然如果主题中只有一个分区,并发不会发生:单个分区在单个线程中处理。
另一种选择确实是配置 concurrency
并且将根据 concurrency <-> partition
状态再次创建多个消费者。
正如@Salavat Yalalo 所建议的那样,我将我的 Kafka 容器工厂设为 ConcurrentKafkaListenerContainerFactory
。在@KafkaListenere 方法上,我添加了名为 concurrency 的选项,它接受一个整数作为字符串,表示要跨越的消费者数量,如下所示
@KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values)
public void topicConsumer(Message<MyObject> myObject){
//.....
}
当运行时,我看到在一个消费者组中创建了 4 个消费者。