如何检查 Kafka 中的分区数(confluent_kafka)
How to check number of partitions in Kafka(confluent_kafka)
我每天都在 0 0 * * * 尝试使用 confluent_kafka Python 包进行批量 etl。我知道我的流中有 4 个分区,但它可以更改,所以有什么方法可以检查特定主题中的分区总数吗?
我的消费者就是这样;
from confluent_kafka import Consumer, KafkaError
messages = list()
partition_counter = 0
tnof_partition = 4
while True:
msg = self.consumer.poll(0.1)
if msg is None:
continue
elif not msg.error():
event = json.loads(msg.value().decode('utf-8'))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {0}/{1}"
.format(msg.topic(), msg.partition()))
partition_counter += 1
if(partition_counter == tnof_partition):
self.consumer.commit()
self.consumer.close()
break
另外,如果您能展示实现批量消费者的替代方法,我将不胜感激。谢谢
Consumer 的 list_topics()
方法可以提供 Topics
的地图,其中包含 TopicMetadata
,最终包含 partitions
。
我每天都在 0 0 * * * 尝试使用 confluent_kafka Python 包进行批量 etl。我知道我的流中有 4 个分区,但它可以更改,所以有什么方法可以检查特定主题中的分区总数吗? 我的消费者就是这样;
from confluent_kafka import Consumer, KafkaError
messages = list()
partition_counter = 0
tnof_partition = 4
while True:
msg = self.consumer.poll(0.1)
if msg is None:
continue
elif not msg.error():
event = json.loads(msg.value().decode('utf-8'))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {0}/{1}"
.format(msg.topic(), msg.partition()))
partition_counter += 1
if(partition_counter == tnof_partition):
self.consumer.commit()
self.consumer.close()
break
另外,如果您能展示实现批量消费者的替代方法,我将不胜感激。谢谢
Consumer 的 list_topics()
方法可以提供 Topics
的地图,其中包含 TopicMetadata
,最终包含 partitions
。