一些 Python Confluent Kafka Consumers 会留下来 idle/unassigned 即使其他人 overloaded/over-assigned

Some Python Confluent Kafka Consumers are staying idle/unassigned even though others overloaded/over-assigned

设置:

我使用非常标准的订阅代码:

def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
        from confluent_kafka import Consumer
        self._consumer = Consumer({
            'bootstrap.servers': kafka_broker_list,
            'fetch.max.bytes': 50 * 1024 * 1024,  # 50MB
            'auto.offset.reset': 'earliest',
            'group.id': group_id,
            'enable.auto.commit': True
        })
        logging.info(f"Subscribing for topics: {topics}")
        self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)

问题: 在我启动的 120 个消费者中,只有 84 个(与最大主题的分区数相同)获得分区分配 - 其他人没有任何分区分配,因此保持空闲状态。更糟糕的是,我通常有 5 个消费者分配了大约 10 个分区,一些有 8 个,很多有 2-3-4,还有很多消费者只分配了一个分区。 我相信“第一个”消费者订阅,获得最多的主题,直到每个主题的可用分区都用完。

题目:

  1. 我读到 partition.assignment.strategy 配置 属性 可供 Java 消费者使用,但是我在 Confluent Kafka 客户端中找不到它。那么有没有办法在Confluent Kafka Python Client中配置分配策略?
  2. 有没有办法在服务器上设置分区分配策略,或者按主题或按组 ID?
  3. 或者,是否有不同的方式在所有消费者之间分配负载?

感谢您花时间阅读我的问题:)

confluent-kafka python 客户端在内部使用 librdkafka 库,它实际上允许配置分配策略。目前支持两种分配策略——“range”——默认的,以及解决我描述的问题的“roundrobin”。

它是通过在消费者配置中添加以下配置属性来配置的:

'partition.assignment.strategy': 'roundrobin',

所有 librdkafka 属性的文档可在此处获得: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md