在 Kafka 0.9 中,有没有一种方法可以列出消费者组中所有消费者的偏移量?

In Kafka 0.9 is there a way I can list out the offset for all the consumers in a consumer group?

我正在使用 Kafka 0.9 new Consumer API

我让 Kafka 为消费者处理偏移量。我有消费者 运行 在多台机器上阅读同一主题。

我正在尝试找出以下内容:

我以为消费者组 - 消费者关系将存储在 ZooKeeper 中。我在 ZooKeeper 中看到消费者节点,它没有子节点。

据我查看代码可以看出偏移量正在写入kafka,但我不知道它们正在写入什么主题?

如果由 kafka 处理的偏移量未存储在 zookeeper 中,则它存储在 kafka-logs 文件夹中的主题调用“__consumer_offsets-#”中。

你可以在做poll()时通过查看KafkaRecords中的offset字段来找出每个消费者的偏移量,如果你想了解更多关于消费者组的信息,请查看bin/kafka-consumer-groups.sh

希望对您有所帮助!

@nautilus 指出的 __consumer_offsets 主题中似乎至少存储了 2 种类型的键值对。

  1. 群组元数据信息
  2. 偏移提交

据我所知,Kafka 使用自己的模式和序列化。您可以通过 kafka.coordinator.GroupMetadataManager:

了解更多关于这些结构的信息
  • GroupMetadataManager.OFFSET_COMMIT_KEY_SCHEMA
  • GroupMetadataManager.OFFSET_COMMIT_VALUE_SCHEMA_V0
  • GroupMetadataManager.GROUP_METADATA_KEY_SCHEMA
  • GroupMetadataManager.GROUP_METADATA_VALUE_SCHEMA_V0

正如@hba 提到的,encoding/decoding 的详细信息可以在靠近底部的 kafka.coordinator.GroupMetadataManager 中找到。寻找 readMessageKey 和以下两种方法。基本上,您需要的是一系列调用,例如

import org.apache.kafka.common.protocol.types.Type;
...
ByteBuffer bb = ByteBuffer.wrap(consumerRecord.key())
short version = bb.getShort();
String group = (String)Type.String.read(bb);
String topic = (String)Type.String.read(bb);
int partition = (int)Type.INT32.read(bb);

好处是 org.apache.kafka.common.protocol.types.Type 是 Java api 的一部分,独立于大主 Jar。丑陋的部分是,上面的代码片段并不完整。每个 consumerRecord.key()consumerRecord.value() 都有两个版本,其中一个必须模仿上述方法的解码。没什么大不了的,就是有点乏味。

如果你的项目可以依赖scala-jar,完整的kafka-jar和Kafka需要的一两个jar,你不妨使用GroupMetadataManager.readMessageKey(bb)和其他两种方法读取键和值。至少在 0.9.0.1 中是 public.