如何以编程方式获取 Python 中每个 Kafka 主题分区的最新偏移量
How to programmatically get latest offset per Kafka topic partition in Python
我是 Kafka 的新手,想获取每个分区的 Kafka 主题的位置。
我在文档中看到 - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafkaadminclient - 可以通过函数 KafkaAdminClient.list_consumer_group_offsets
获得偏移量,
但是我没有看到那里的位置有这样的方法。
有人知道我怎样才能得到它吗?
您可以使用 position
:
Retrieve current positions (offsets) for the list of partitions.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
offset_per_partition = consumer.position(partitions)
或者,您也可以使用 get_watermark_offsets
,但您必须一次传递一个分区,因此需要多次调用:
Retrieve low and high offsets for partition.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
for p in partitions:
low_offset, high_offset = consumer.get_watermark_offsets(p)
print(f"Latest offset for partition {f}: {high_offset}")
您可以使用 end_offsets
:
Get the last offset for the given partitions. The last offset of a
partition is the offset of the upcoming message, i.e. the offset of
the last available message + 1.
This method does not change the current consumer position of the
partitions.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)
如果您想遍历所有主题,可以使用以下方法:
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
kafka_topics = consumer.topics()
for topic in kafka_topics:
partitions= = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
last_offset_per_partition = consumer.end_offsets(partitions)
我是 Kafka 的新手,想获取每个分区的 Kafka 主题的位置。
我在文档中看到 - https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafkaadminclient - 可以通过函数 KafkaAdminClient.list_consumer_group_offsets
获得偏移量,
但是我没有看到那里的位置有这样的方法。
有人知道我怎样才能得到它吗?
您可以使用 position
:
Retrieve current positions (offsets) for the list of partitions.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
offset_per_partition = consumer.position(partitions)
或者,您也可以使用 get_watermark_offsets
,但您必须一次传递一个分区,因此需要多次调用:
Retrieve low and high offsets for partition.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
for p in partitions:
low_offset, high_offset = consumer.get_watermark_offsets(p)
print(f"Latest offset for partition {f}: {high_offset}")
您可以使用 end_offsets
:
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
This method does not change the current consumer position of the partitions.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)
如果您想遍历所有主题,可以使用以下方法:
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
kafka_topics = consumer.topics()
for topic in kafka_topics:
partitions= = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
last_offset_per_partition = consumer.end_offsets(partitions)