Spring 具有多个分区和多个侦听器的 Kafka
Spring Kafka with multiple Partition & multiple listener
我想为一个主题设置多个分区,比如 3 个分区。此外,同一组中的 3 个消费者会从这些分区读取数据(根据 kafka 设计的 1-1 映射)。
我的配置如下:
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaMessageContainer() throws Exception {
//Setting 3 Partition
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));
//Group id is set same for multiple instances of this service
containerProps.setGroupId(groupId);
ConcurrentMessageListenerContainer<String, String> factory = new ConcurrentMessageListenerContainer <String, String>(kafkaConsumerFactory(kafkaBootStrapServers), containerProps);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(KAFKA_RETRY_INTERVAL, KAFKA_RETRY_MAX_ATTEMPT)));
//Setting 3 listeners
factory.setConcurrency(3);
return factory;
}
private ConsumerFactory<String, String> kafkaConsumerFactory(String kafkaBootStrapServers) {
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
return new DefaultKafkaConsumerFactory<String, String>(config);
}
当我 运行 这是我服务的单个实例时,听众 运行 可以从各自的主题分区(无重复)中读取消息。但是,当我创建该服务的 2 个实例时,这两个实例的侦听器再次从同一主题分区读取。
这会导致重复处理同一条消息。
我的问题是,如果我在这两个实例中都没有更改监听器的组 ID,那么为什么不同实例中的监听器会再次阅读相同的消息。另外,我如何确保它像在单个实例中一样正常工作。
谢谢
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));
您正在手动分配分区,因此 group.id
与分配目的无关。
如果你想使用组管理在多个实例中分配分区,你只需要使用ContainerProperties containerProps = new ContainerProperties(topic);
。
对于并发数 = 3 的 2 个实例,您至少需要 6 个分区;否则你会有空闲的消费者。
当只有一个实例时,它将获得所有6个分区。
我想为一个主题设置多个分区,比如 3 个分区。此外,同一组中的 3 个消费者会从这些分区读取数据(根据 kafka 设计的 1-1 映射)。
我的配置如下:
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaMessageContainer() throws Exception {
//Setting 3 Partition
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));
//Group id is set same for multiple instances of this service
containerProps.setGroupId(groupId);
ConcurrentMessageListenerContainer<String, String> factory = new ConcurrentMessageListenerContainer <String, String>(kafkaConsumerFactory(kafkaBootStrapServers), containerProps);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(KAFKA_RETRY_INTERVAL, KAFKA_RETRY_MAX_ATTEMPT)));
//Setting 3 listeners
factory.setConcurrency(3);
return factory;
}
private ConsumerFactory<String, String> kafkaConsumerFactory(String kafkaBootStrapServers) {
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
return new DefaultKafkaConsumerFactory<String, String>(config);
}
当我 运行 这是我服务的单个实例时,听众 运行 可以从各自的主题分区(无重复)中读取消息。但是,当我创建该服务的 2 个实例时,这两个实例的侦听器再次从同一主题分区读取。 这会导致重复处理同一条消息。
我的问题是,如果我在这两个实例中都没有更改监听器的组 ID,那么为什么不同实例中的监听器会再次阅读相同的消息。另外,我如何确保它像在单个实例中一样正常工作。 谢谢
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));
您正在手动分配分区,因此 group.id
与分配目的无关。
如果你想使用组管理在多个实例中分配分区,你只需要使用ContainerProperties containerProps = new ContainerProperties(topic);
。
对于并发数 = 3 的 2 个实例,您至少需要 6 个分区;否则你会有空闲的消费者。
当只有一个实例时,它将获得所有6个分区。