如何为我的消费者获取所有分区的当前偏移量?

How to get current offsets for all partitions for my consumer?

我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position 应该可以解决问题,所以我这样尝试:

consumer = Consumer({
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.CONSUMER_GROUP,
    'enable.auto.commit': False,
})

# get all topics
topics = consumer.list_topics()

# get all partitions
partitions = []
for name, meta  in topics.topics.items():
    for partition_id in meta.partitions.keys():
        part = TopicPartition(name, partition_id)
        partitions.append(part)

# get all offsets
x = consumer.position(partitions)

但是,x 中生成的分区中的所有偏移量仍然是 -1001

如果我用 lenses 或其他工具检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。

尝试添加 consumer.subscribe()consumer.assign() 函数

consumer = Consumer({
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.CONSUMER_GROUP,
    'enable.auto.commit': False,
})

# get all partitions
partitions = []
for name, meta  in topics.topics.items():
    for partition_id in meta.partitions.keys():
        part = TopicPartition(name, partition_id)
        partitions.append(part)

consumer.assign(partitions)
committed = consumer.committed(tp)

last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset))

作为参考,这是有效的解决方案:

consumer = Consumer({
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.CONSUMER_GROUP,
    'enable.auto.commit': False,
})

# get all topics
topics = consumer.list_topics()

# get all partitions
partitions = []
for name, meta  in topics.topics.items():
    for partition_id in meta.partitions.keys():
        part = TopicPartition(name, partition_id)
        partitions.append(part)

# get last committed offsets
partitions = consumer.committed(partitions)

显然 consumer.position 并不像宣传的那样工作,但是 consumer.committed returns 存储的偏移量,即使消费者当前没有订阅 topic/partition。