Kafka 使用 admin API 提交和最后的偏移量

Kafka committed and last offsets using admin API

我正在使用管理客户端 API 查询 kafka 代理,以使用以下代码获取 CONSUMER_GROUP 的提交偏移量:

Map<TopicPartition, OffsetAndMetadata> offsets =
                admin.listConsumerGroupOffsets(CONSUMER_GROUP)
                        .partitionsToOffsetAndMetadata().get();

以上代码将触发对特殊创建的 __consumer_offsets 主题的查询,以获取主题分区的每个分区的提交偏移量CONSUMER_GROUP 负责。

另一方面,我使用下面的代码检索 CONSUMER_GROUP

的每个主题分区的最新(结束)偏移量
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();

for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
long latestOffset = latestOffsets.get(e.getKey()).offset();

我的问题是提交的和最新的偏移量因此 queried/requested 来自两个不同的主题。从 __consumer_offsets 主题请求提交的偏移量,从 CONSUMER_GROUP 的实际主题请求最新(结束)偏移量。

(1) 上面关于请求已提交和最新偏移量的描述是否准确?

(2)是否可以直接查询__consumer_offsets话题?

谢谢。

  1. 是的,你的理解是正确的。提交的偏移量存储在 __consumer_offsets 主题中,而您需要查询特定分区以获得它们的结束偏移量。

  2. 是的__consumer_offsets是正则题,喜欢的可以直接消费。通过提供的 API 检索数据通常更容易,但如果您对其内容感兴趣,您可以使用它。如果您想了解如何反序列化数据,请选中 console Formatters