为什么两个具有相同 group.id 的消费者在 Kafka 中收到相同的消息

Why two consumers with the same group.id receive the same message in Kafka

Spring-boot producer app 向主题==“Topic0”发送消息==“AAA”,密钥==“0”。消费者应用程序(也是 spring-boot)有 3 个 listeners/consumers,配置如下:

  1. 主题 0,分区 == 0,组 0
  2. 主题 0,分区 == 0,组 0
  3. 主题 0,分区 == 0,组 1
public static final String KAFKA_TOPIC_0 = "Topic0";
public static final String KAFKA_GROUP_ID_0 = "Group0";
public static final String KAFKA_GROUP_ID_1 = "Group1";
@KafkaListener(topicPartitions = @TopicPartition(topic = KAFKA_TOPIC_0, partitions = {"0"}), groupId = KAFKA_GROUP_ID_0)
public void listenTopic0Partition0GroupId0(String message,
                                           @Header(KafkaHeaders.GROUP_ID) String groupId,
                                           @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    System.out.printf("listenTopic0Partition0GroupId0 topic %s partition %d group %s message %s \n",
            meta.topic(),
            meta.partition(),
            groupId,
            message);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = KAFKA_TOPIC_0, partitions = {"0"}), groupId = KAFKA_GROUP_ID_0)
public void listen2Topic0Partition0GroupId0(String message,
                                            @Header(KafkaHeaders.GROUP_ID) String groupId,
                                            @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    System.out.printf("listen2Topic0Partition0GroupId0 topic %s partition %d group %s message %s \n",
            meta.topic(),
            meta.partition(),
            groupId,
            message);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = KAFKA_TOPIC_0, partitions = {"0"}), groupId = KAFKA_GROUP_ID_1)
public void listenTopic0Partition0GroupId1(String message,
                                           @Header(KafkaHeaders.GROUP_ID) String groupId,
                                           @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {

    System.out.printf("listenTopic0Partition0GroupId1 topic %s partition %d group %s message %s \n",
            meta.topic(),
            meta.partition(),
            groupId,
            message);
}

现在我发一条消息:

kafkaTemplate.send("Topic0", "0", "AAA");

我看到有 3 个听众收到了它:

listenTopic0Partition0GroupId1 topic Topic0 partition 0 group Group1 message AAA 
listenTopic0Partition0GroupId0 topic Topic0 partition 0 group Group0 message AAA 
listen2Topic0Partition0GroupId0 topic Topic0 partition 0 group Group0 message AAA

根据https://docs.confluent.io/5.4.1/kafka/introduction.html

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load-balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

但是,同一组中收听相同主题和分区的 2 个消费者收到了相同的消息。怎么来的?

我也把主题配置:

@Bean
public NewTopic topicAAA() {
    return TopicBuilder.name(KAFKA_TOPIC_0)
            .partitions(2)
            .replicas(1)
            .build();
}

您不应将 groupIdpartitions 一起使用,因为 groupId 将被忽略。尝试删除 partitions 并检查。

这是您问题的实际答案

参考这个回答:

如果链接的答案有帮助,您可以为它点赞,更多信息请阅读下文。


为偶然发现为什么部分的人添加一些额外的细节:

对于 Kafka 消费者,我们有 assignsubscribeassign() 不使用组管理功能,而 subscribe() 使用。

正是在 assign 期间,我们手动给出了主题分区。对于 subscribe(),我们只提供主题名称。通过 subscribe() 将在组中的不同消费者之间分配主题分区将在内部进行处理。消费者心跳、再平衡等内容将作为此组管理功能的一部分出现。

由于您是手动使用partitions字段,因此很可能会调用assign()。有关这些方法,请参阅 KafkaConsumer


根据您的use-case,您可以选择决定是否要subscribe()

经验法则是,如果您希望针对同一主题集在多个消费者之间自动分配负载并跟踪偏移量直到您之前阅读的位置,然后使用 subscribe()

如果只是想看一眼数据,更多的是像kafka-console-consumer那样的查看目的,那就用assign().