如何检查 Kafka 中的分区数(confluent_kafka)

How to check number of partitions in Kafka(confluent_kafka)

我每天都在 0 0 * * * 尝试使用 confluent_kafka Python 包进行批量 etl。我知道我的流中有 4 个分区,但它可以更改,所以有什么方法可以检查特定主题中的分区总数吗? 我的消费者就是这样;

from confluent_kafka import Consumer, KafkaError

    messages = list()
    partition_counter = 0
    tnof_partition = 4

    while True:
        msg = self.consumer.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            event = json.loads(msg.value().decode('utf-8'))

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print("End of partition reached {0}/{1}"
                .format(msg.topic(), msg.partition()))
            
            partition_counter += 1
            if(partition_counter == tnof_partition):
                self.consumer.commit()
                self.consumer.close()
                break

另外,如果您能展示实现批量消费者的替代方法,我将不胜感激。谢谢

Consumer 的 list_topics() 方法可以提供 Topics 的地图,其中包含 TopicMetadata,最终包含 partitions

参考:https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.list_topics