如何获取 kafka 主题分区的最新偏移量?
How to get latest offset for a partition for a kafka topic?
我正在使用 Kafka 的 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
但是我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
我有另一种方法 assign
但结果是一样的
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
从某些文档看来,如果 fetch
尚未发布,我可能会遇到此行为。但是我找不到强制执行的方法。我做错了什么?
或者是否有 different/simpler 获取主题最新偏移量的方法?
在为此花了一天时间和几次错误的开始之后,我终于找到了解决方案并让它运行起来。发给她,以便其他人参考。
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
如果你想使用 kafka/bin 中的 Kafka shell 脚本,那么你可以使用 kafka-运行-class.sh 获得最新和最小的偏移量。
获取最新的偏移命令如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
要获得最小的偏移量,命令如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
您可以从以下 link
中找到有关 Get Offsets Shell 的更多信息
希望对您有所帮助!
from kafka import KafkaConsumer, TopicPartition
TOPIC = 'MYTOPIC'
GROUP = 'MYGROUP'
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(TOPIC):
tp = TopicPartition(TOPIC, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
另一种实现方式是通过轮询消费者获取最后消费的偏移量,然后使用seek_to_end方法获取最近可用的偏移量分区。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
consumer.poll()
consumer.seek_to_end()
这个方法在使用消费组的时候特别好用。
来源:
有了kafka-python>=1.3.4
你可以使用:
kafka.KafkaConsumer.end_offsets(partitions)
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.end_offsets(ps)
您可以使用 position
:
Retrieve current positions (offsets) for the list of partitions.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
offset_per_partition = consumer.position(partitions)
或者,您也可以使用 get_watermark_offsets
,但您必须一次传递一个分区,因此需要多次调用:
Retrieve low and high offsets for partition.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
for p in partitions:
low_offset, high_offset = consumer.get_watermark_offsets(p)
print(f"Latest offset for partition {p}: {high_offset}")
您可以使用 end_offsets
:
Get the last offset for the given partitions. The last offset of a
partition is the offset of the upcoming message, i.e. the offset of
the last available message + 1.
This method does not change the current consumer position of the
partitions.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)
kafka-consumer-groups --bootstrap-server host1:9093,crow-host2:9093,host3:9093 --command-config=/root/client.properties --describe --group atlas
此命令将显示状态。 Lag/offset
我正在使用 Kafka 的 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
但是我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
我有另一种方法 assign
但结果是一样的
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
从某些文档看来,如果 fetch
尚未发布,我可能会遇到此行为。但是我找不到强制执行的方法。我做错了什么?
或者是否有 different/simpler 获取主题最新偏移量的方法?
在为此花了一天时间和几次错误的开始之后,我终于找到了解决方案并让它运行起来。发给她,以便其他人参考。
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
如果你想使用 kafka/bin 中的 Kafka shell 脚本,那么你可以使用 kafka-运行-class.sh 获得最新和最小的偏移量。
获取最新的偏移命令如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
要获得最小的偏移量,命令如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
您可以从以下 link
中找到有关 Get Offsets Shell 的更多信息希望对您有所帮助!
from kafka import KafkaConsumer, TopicPartition
TOPIC = 'MYTOPIC'
GROUP = 'MYGROUP'
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(TOPIC):
tp = TopicPartition(TOPIC, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
另一种实现方式是通过轮询消费者获取最后消费的偏移量,然后使用seek_to_end方法获取最近可用的偏移量分区。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
consumer.poll()
consumer.seek_to_end()
这个方法在使用消费组的时候特别好用。
来源:
有了kafka-python>=1.3.4
你可以使用:
kafka.KafkaConsumer.end_offsets(partitions)
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.end_offsets(ps)
您可以使用 position
:
Retrieve current positions (offsets) for the list of partitions.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
offset_per_partition = consumer.position(partitions)
或者,您也可以使用 get_watermark_offsets
,但您必须一次传递一个分区,因此需要多次调用:
Retrieve low and high offsets for partition.
from confluent_kafka import Consumer, TopicPartition
consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())]
for p in partitions:
low_offset, high_offset = consumer.get_watermark_offsets(p)
print(f"Latest offset for partition {p}: {high_offset}")
您可以使用 end_offsets
:
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
This method does not change the current consumer position of the partitions.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)
kafka-consumer-groups --bootstrap-server host1:9093,crow-host2:9093,host3:9093 --command-config=/root/client.properties --describe --group atlas
此命令将显示状态。 Lag/offset