如何获取消费者组的最后消费偏移量?

How to get last consumed offset for a consumer group?

我在一个消费者组中有两个消费者,它们分配了相同的 kafka 主题分区。我希望从消费者 B 中获取消费者 A 的最后读取偏移量。任何想法,如何实现这个?

永远不会将单个分区分配给同一组中的两个消费者实例。

您可以使用下面的脚本来了解最后消耗的偏移量

sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer --group groupname --describe

Kafka store offsets by (consumer-group-id, topic, partition) 所以首先要注意的是,从 Kafka 的角度来看,没有像 "last read offset of consumer A" 这样的东西。您可以通过 Kafka 消费者 API 获得的所有信息都是针对给定的(组、主题、分区)。您在消费者 API 中有两个可能有用的方法。

commited():获取给定分区的最后提交的偏移量(无论提交是由这个进程还是另一个进程发生)。

position(): 获取下一条将要获取的记录的偏移量(如果存在具有该偏移量的记录)。

如果这不是您所需要的,那么您将不得不自己实施一些东西。假设您已经知道如何从消费者 A 读取最后的偏移量,那么消费者 A 应该将该值存储在消费者 B 可用的某个位置。这个位置可能是

  • 卡夫卡本身。例如,消费者 A 可以将上次读取的偏移量发布到 一个众所周知的主题,如 ConsumerA-p0 和 Consumer B 可以订阅 这个话题。
  • 动物园管理员。同样,同意一条众所周知的道路。
  • 外部数据库。
  • 如果两个消费者共享同一个 OS,则有更多基本选项:IPC、文件系统中的文件、内存中受锁保护的变量等。

kafka->bin 使用以下命令将您的组 ID 更改为 groupId:

sh kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group groupId --describe

你会得到如下输出:

TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
topic1 0          0               35              35              -               -               -
topic2 0          1600            1600            0               -               -               -