如何使用 confluent-kafka-python 获取 Kafka 主题的最后一条消息的偏移量?
How do I get the the offset of last message of a Kafka topic using confluent-kafka-python?
我需要使用 confluent-kafka-python
检索主题的最后 N 条消息。
我已经阅读https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#一天了,但没有找到任何合适的方法来获取最后一条消息的偏移量,因此我无法计算消费者开始的偏移量。
请帮忙。谢谢!
您需要 Consumer 的 get_watermark_offsets()
功能。你用 TopicPartition
的列表来调用它,它 returns 每个分区的元组 (int, int)
(低,高)。
像这样:
from confluent_kafka import Consumer, TopicPartition
# create the Consumer with a connection to your brokers
topic_name = "my.topic"
topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
offsets = consumer.get_watermark_offsets(topicparts)
for p in enumerate(offsets):
msg = "partition {p} starting offset {so} last offset {lo}"
print(msg.format(p=p, so=offsets[p][0], lo=offsets[p][1]))
我需要使用 confluent-kafka-python
检索主题的最后 N 条消息。
我已经阅读https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#一天了,但没有找到任何合适的方法来获取最后一条消息的偏移量,因此我无法计算消费者开始的偏移量。
请帮忙。谢谢!
您需要 Consumer 的 get_watermark_offsets()
功能。你用 TopicPartition
的列表来调用它,它 returns 每个分区的元组 (int, int)
(低,高)。
像这样:
from confluent_kafka import Consumer, TopicPartition
# create the Consumer with a connection to your brokers
topic_name = "my.topic"
topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
offsets = consumer.get_watermark_offsets(topicparts)
for p in enumerate(offsets):
msg = "partition {p} starting offset {so} last offset {lo}"
print(msg.format(p=p, so=offsets[p][0], lo=offsets[p][1]))