Spring 具有多个分区和多个侦听器的 Kafka

Spring Kafka with multiple Partition & multiple listener

我想为一个主题设置多个分区,比如 3 个分区。此外,同一组中的 3 个消费者会从这些分区读取数据(根据 kafka 设计的 1-1 映射)。

我的配置如下:

@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaMessageContainer() throws Exception {
    
    //Setting 3 Partition
    ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));
    //Group id is set same for multiple instances of this service
    containerProps.setGroupId(groupId);
    
    ConcurrentMessageListenerContainer<String, String> factory = new ConcurrentMessageListenerContainer <String, String>(kafkaConsumerFactory(kafkaBootStrapServers), containerProps);
    
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(KAFKA_RETRY_INTERVAL, KAFKA_RETRY_MAX_ATTEMPT)));
    //Setting 3 listeners
    factory.setConcurrency(3);
    
    return factory;
}
private ConsumerFactory<String, String> kafkaConsumerFactory(String kafkaBootStrapServers) {
        
    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    
    return new DefaultKafkaConsumerFactory<String, String>(config);
}

当我 运行 这是我服务的单个实例时,听众 运行 可以从各自的主题分区(无重复)中读取消息。但是,当我创建该服务的 2 个实例时,这两个实例的侦听器再次从同一主题分区读取。 这会导致重复处理同一条消息。

我的问题是,如果我在这两个实例中都没有更改监听器的组 ID,那么为什么不同实例中的监听器会再次阅读相同的消息。另外,我如何确保它像在单个实例中一样正常工作。 谢谢

ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic, 0), new TopicPartitionOffset(topic, 1), new TopicPartitionOffset(topic, 2));

您正在手动分配分区,因此 group.id 与分配目的无关。

如果你想使用组管理在多个实例中分配分区,你只需要使用ContainerProperties containerProps = new ContainerProperties(topic);

对于并发数 = 3 的 2 个实例,您至少需要 6 个分区;否则你会有空闲的消费者。

当只有一个实例时,它将获得所有6个分区。