Kafka消费者-消费者进程和线程与主题分区的关系是什么

Kafka consumer - what's the relation of consumer processes and threads with topic partitions

我最近一直在使用 Kafka,对消费者组下的消费者有些困惑。混淆的中心是将消费者实现为进程还是线程。对于这个问题,假设我使用的是高级消费者。

让我们考虑一个我试验过的场景。在我的主题中有 2 个分区(为简单起见,我们假设复制因子仅为 1)。我使用组 group1 创建了一个消费者 (ConsumerConnector) 进程 consumer1,然后创建了一个大小为 2 的主题计数图,然后生成了 2 个消费者线程 consumer1_thread1 和 [=15= 】 在那个过程下。看起来 consumer1_thread1 正在使用分区 0,而 consumer1_thread2 正在使用分区 1。这种行为总是确定性的吗?下面是代码片段。 Class TestConsumer 是我的消费者线程 class.

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...

现在,让我们考虑另一种情况(我没有试验过但很好奇),我启动了 2 个消费者进程 consumer1consumer2 都具有相同的组 group1 和他们每个人都是一个单线程进程。现在我的问题是:

  1. 在这种情况下,两个独立的消费者进程(尽管在同一组下)将如何与分区相关?和上面的单进程多线程场景有什么区别?

  2. 一般来说,消费者线程或进程如何映射/关联到主题中的分区?

  3. Kafka文档中确实说了一个消费者组下的每个消费者都会消费一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者进程?

  4. 关于将消费者实现为进程与线程,我在这里遗漏了什么微妙的东西吗?提前致谢。

一个消费者组可以有多个消费者实例运行(多个进程同一个group-id)。在消费 时,每个分区只被组中的一个消费者实例消费

例如如果您的主题包含 2 个分区,并且您使用 2 个消费者实例启动一个消费者组 group-A,那么它们中的每一个都将使用来自该主题的特定分区的消息。

如果您使用不同的组 ID group-A & group-B 启动相同的 2 个消费者,那么来自主题的两个分区的消息将被广播到它们中的每一个。因此,在这种情况下,group-A 下的消费者实例 运行 将具有来自主题的两个分区的消息,group-B 也是如此。

在他们的 documentation

上阅读更多相关信息

EDIT :根据您的评论,

I was wondering what is the effective difference between having 2 consumer threads under the same process as opposed to 2 consumer processes (group being the same in both cases)

消费者 group-id 在集群中 same/global。假设你已经启动了一个有 2 个线程的进程,然后生成了另一个进程(可能在不同的机器上)具有相同的 groupId 有 2 个线程,然后 kafka 将添加这 2 个新线程来使用来自主题的消息。所以最终会有 4 个线程负责从同一个主题中消费。然后 Kafka 将触发重新平衡以将分区重新分配给线程,因此可能会发生这样的情况:对于线程 T1 of process P1 使用的特定分区,可能会分配给线程 T2 of process P2 使用。下面几行摘自wiki页面

When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.

选择具有相同 ID 的多个消费者组实例与单个消费者组实例的主要设计决策是弹性。例如,如果您有一个具有两个线程的消费者,那么如果这台机器出现故障,您将失去所有消费者。如果您有两个具有相同 ID 的独立消费者组,每个消费者组都在不同的主机上,那么它们可以在失败后幸存下来。理想情况下,每个消费者组在上面应该有两个线程,因此如果一个主机宕机,另一个消费者组使用其休眠线程占用另一个分区。事实上,总是希望拥有比分区更多的线程来涵盖这个因素。

  1. 您可以 运行 每个消费者组在不同的主机上。对于给定 name/id 的单个消费者组,它只会 运行 在单个主机上,因为它在单个 运行 时间环境中管理其所有线程。
  2. Kafka 有一个算法来确定哪些 threads/consumer 组读取各种主题分区。 Kafka 试图以一种有弹性的方式均匀地分布这些。当一个消费者组失败时,它会启用其他组中的其他线程来读取给定的分区。
  3. 指消费者组中的单个线程。如果线程数多于分区数,那么其中一些线程将保持休眠状态,直到其他线程无法提供弹性。
  4. 偏好与弹性有关。因此,通过使用相同 ID 设置多个消费者组,我可以 运行 在多个主机上使我的应用程序能够容忍失败。

感谢@user2720864的详细回答,但我认为@user2720864回答中提到的re-allocation案例不正确=>一个分区不能被消费由两个消费者。

当有更多的消费者(与分区相比)时,每个分区将只分配给一个消费者,而剩下的个消费者将保持惰性,直到一些正在工作的消费者被已死亡或被移出群组。

基于Kafka Consumers document:

The way consumption is implemented in Kafka is by dividing up the partitions in the log over the consumer instances so that each instance is the exclusive consumer of a "fair share" of partitions at any point in time. This process of maintaining membership in the group is handled by the Kafka protocol dynamically. If new instances join the group they will take over some partitions from other members of the group; if an instance dies, its partitions will be distributed to the remaining instances.

还有它的 API specification 在 "Consumer Groups and Topic Subscriptions" 部分:

This is achieved by balancing the partitions between all members in the consumer group so that each partition is assigned to exactly one consumer in the group.