为什么我的具有相同组 ID 的 Kafka 消费者不平衡?

Why my Kafka consumers with same group id are not being balanced?

我正在编写一个概念验证应用程序来使用来自 Apache Kafka 0.9.0.0 的消息,看看我是否可以使用它来代替常见的 JMS 消息代理,因为 Kafka 提供了很多好处。这是我的基本代码,使用新的消费者 API:

public class Main implements Runnable {

    public static final long DEFAULT_POLL_TIME = 300;
    public static final String DEFAULT_GROUP_ID = "ltmjTest";

    volatile boolean keepRunning = true;
    private KafkaConsumer<String, Object> consumer;
    private String servers;
    private String groupId = DEFAULT_GROUP_ID;
    private long pollTime = DEFAULT_POLL_TIME;
    private String[] topics;

    public Main() {
    }

    //getters and setters...

    public void createConsumer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configs.put("enable.auto.commit", "true");
        configs.put("auto.commit.interval.ms", "1000");
        configs.put("session.timeout.ms", "30000");

        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(asList(topics));
    }

    public static void main(String[] args) {
        Main main = new Main();
        if (args != null && args.length > 0) {
            for (String arg : args) {
                String[] realArg = arg.trim().split("=", 2);
                String argKey = realArg[0].toLowerCase();
                String argValue = realArg[1];
                switch (argKey) {
                case "polltime":
                    main.setPollTime(Long.parseLong(argValue));
                    break;
                case "groupid":
                    main.setGroupId(argValue);
                    break;
                case "servers":
                    main.setServers(argValue);
                    break;
                case "topics":
                    main.setTopics(argValue.split(","));
                    break;
            }
        }
        main.createConsumer();
        new Thread(main).start();
        try (Scanner scanner = new Scanner(System.in)) {
            while(true) {
                String line = scanner.nextLine();
                if (line.equals("stop")) {
                    main.setKeepRunning(false);
                    break;
                }
            }
        }
    }
}

我已经使用默认设置启动了一个 kafka 服务器,并使用 shell 工具 kafka-console-producer.sh 启动了一个 kafka 生产者来向我的主题写入消息。然后我使用此代码连接两个消费者,发送正确的服务器进行连接和主题订阅,其他所有内容都使用默认值,这意味着两个消费者具有相同的组 ID。我注意到 只有我的一个消费者 消耗了所有数据。我读过默认行为应该是消费者必须由服务器平衡,来自 official tutorial:

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

我怎样才能使消费者的行为像默认值一样?或者我可能遗漏了什么?

有一个特性 kafka.consumer.PartitionAssignor 说明了应该如何为每个消费者分配分区。它有两个实现:RoundRobinAssignor 和 RangeAssignor。默认的是 RangeAssignor。

可以通过设置参数"partition.assignment.strategy"来改变。

轮询文档:

The roundrobin assignor lays out all the available partitions and all the available consumers. It then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumers.) For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p2, t1p1] C1: [t0p1, t1p0, t1p2]

范围分配器文档

The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]

所以,如果我们所有的主题只有一个分区,那么只有一个消费者会工作