如何为我的消费者获取所有分区的当前偏移量?
How to get current offsets for all partitions for my consumer?
我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position 应该可以解决问题,所以我这样尝试:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get all offsets
x = consumer.position(partitions)
但是,x
中生成的分区中的所有偏移量仍然是 -1001
。
如果我用 lenses 或其他工具检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。
尝试添加 consumer.subscribe()
或 consumer.assign()
函数
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
consumer.assign(partitions)
committed = consumer.committed(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset))
作为参考,这是有效的解决方案:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get last committed offsets
partitions = consumer.committed(partitions)
显然 consumer.position
并不像宣传的那样工作,但是 consumer.committed
returns 存储的偏移量,即使消费者当前没有订阅 topic/partition。
我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position 应该可以解决问题,所以我这样尝试:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get all offsets
x = consumer.position(partitions)
但是,x
中生成的分区中的所有偏移量仍然是 -1001
。
如果我用 lenses 或其他工具检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。
尝试添加 consumer.subscribe()
或 consumer.assign()
函数
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
consumer.assign(partitions)
committed = consumer.committed(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset))
作为参考,这是有效的解决方案:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get last committed offsets
partitions = consumer.committed(partitions)
显然 consumer.position
并不像宣传的那样工作,但是 consumer.committed
returns 存储的偏移量,即使消费者当前没有订阅 topic/partition。