在 Kafka 0.9 中,有没有一种方法可以列出消费者组中所有消费者的偏移量?
In Kafka 0.9 is there a way I can list out the offset for all the consumers in a consumer group?
我正在使用 Kafka 0.9 new Consumer API。
我让 Kafka 为消费者处理偏移量。我有消费者 运行 在多台机器上阅读同一主题。
我正在尝试找出以下内容:
- 注册消费者组的消费者
- 每个消费者的偏移量
我以为消费者组 - 消费者关系将存储在 ZooKeeper 中。我在 ZooKeeper 中看到消费者节点,它没有子节点。
据我查看代码可以看出偏移量正在写入kafka,但我不知道它们正在写入什么主题?
如果由 kafka 处理的偏移量未存储在 zookeeper 中,则它存储在 kafka-logs 文件夹中的主题调用“__consumer_offsets-#”中。
你可以在做poll()时通过查看KafkaRecords中的offset字段来找出每个消费者的偏移量,如果你想了解更多关于消费者组的信息,请查看bin/kafka-consumer-groups.sh
希望对您有所帮助!
@nautilus 指出的 __consumer_offsets 主题中似乎至少存储了 2 种类型的键值对。
- 群组元数据信息
- 偏移提交
据我所知,Kafka 使用自己的模式和序列化。您可以通过 kafka.coordinator.GroupMetadataManager
:
了解更多关于这些结构的信息
GroupMetadataManager.OFFSET_COMMIT_KEY_SCHEMA
GroupMetadataManager.OFFSET_COMMIT_VALUE_SCHEMA_V0
GroupMetadataManager.GROUP_METADATA_KEY_SCHEMA
GroupMetadataManager.GROUP_METADATA_VALUE_SCHEMA_V0
正如@hba 提到的,encoding/decoding 的详细信息可以在靠近底部的 kafka.coordinator.GroupMetadataManager
中找到。寻找 readMessageKey
和以下两种方法。基本上,您需要的是一系列调用,例如
import org.apache.kafka.common.protocol.types.Type;
...
ByteBuffer bb = ByteBuffer.wrap(consumerRecord.key())
short version = bb.getShort();
String group = (String)Type.String.read(bb);
String topic = (String)Type.String.read(bb);
int partition = (int)Type.INT32.read(bb);
好处是 org.apache.kafka.common.protocol.types.Type
是 Java api 的一部分,独立于大主 Jar。丑陋的部分是,上面的代码片段并不完整。每个 consumerRecord.key()
和 consumerRecord.value()
都有两个版本,其中一个必须模仿上述方法的解码。没什么大不了的,就是有点乏味。
如果你的项目可以依赖scala-jar,完整的kafka-jar和Kafka需要的一两个jar,你不妨使用GroupMetadataManager.readMessageKey(bb)
和其他两种方法读取键和值。至少在 0.9.0.1 中是 public.
我正在使用 Kafka 0.9 new Consumer API。
我让 Kafka 为消费者处理偏移量。我有消费者 运行 在多台机器上阅读同一主题。
我正在尝试找出以下内容:
- 注册消费者组的消费者
- 每个消费者的偏移量
我以为消费者组 - 消费者关系将存储在 ZooKeeper 中。我在 ZooKeeper 中看到消费者节点,它没有子节点。
据我查看代码可以看出偏移量正在写入kafka,但我不知道它们正在写入什么主题?
如果由 kafka 处理的偏移量未存储在 zookeeper 中,则它存储在 kafka-logs 文件夹中的主题调用“__consumer_offsets-#”中。
你可以在做poll()时通过查看KafkaRecords中的offset字段来找出每个消费者的偏移量,如果你想了解更多关于消费者组的信息,请查看bin/kafka-consumer-groups.sh
希望对您有所帮助!
@nautilus 指出的 __consumer_offsets 主题中似乎至少存储了 2 种类型的键值对。
- 群组元数据信息
- 偏移提交
据我所知,Kafka 使用自己的模式和序列化。您可以通过 kafka.coordinator.GroupMetadataManager
:
GroupMetadataManager.OFFSET_COMMIT_KEY_SCHEMA
GroupMetadataManager.OFFSET_COMMIT_VALUE_SCHEMA_V0
GroupMetadataManager.GROUP_METADATA_KEY_SCHEMA
GroupMetadataManager.GROUP_METADATA_VALUE_SCHEMA_V0
正如@hba 提到的,encoding/decoding 的详细信息可以在靠近底部的 kafka.coordinator.GroupMetadataManager
中找到。寻找 readMessageKey
和以下两种方法。基本上,您需要的是一系列调用,例如
import org.apache.kafka.common.protocol.types.Type;
...
ByteBuffer bb = ByteBuffer.wrap(consumerRecord.key())
short version = bb.getShort();
String group = (String)Type.String.read(bb);
String topic = (String)Type.String.read(bb);
int partition = (int)Type.INT32.read(bb);
好处是 org.apache.kafka.common.protocol.types.Type
是 Java api 的一部分,独立于大主 Jar。丑陋的部分是,上面的代码片段并不完整。每个 consumerRecord.key()
和 consumerRecord.value()
都有两个版本,其中一个必须模仿上述方法的解码。没什么大不了的,就是有点乏味。
如果你的项目可以依赖scala-jar,完整的kafka-jar和Kafka需要的一两个jar,你不妨使用GroupMetadataManager.readMessageKey(bb)
和其他两种方法读取键和值。至少在 0.9.0.1 中是 public.