Kafka 如何从 __consumer_offsets 主题中读取

Kafka how to read from __consumer_offsets topic

我试图找出我当前的高级消费者正在处理哪些抵消。我使用 Kafka 0.8.2.1,在 Kafka 的 server.properties 中设置 no "offset.storage" - 我认为这意味着偏移量存储在 Kafka 中。 (我还通过检查 Zk shell: /consumers/consumer_group_name/offsets/topic_name/partition_number 中的这条路径验证了 Zookeeper 中没有存储任何偏移量)

我试过收听__consumer_offsets主题,看看哪个消费者保存了多少偏移量,但是没有用...

我尝试了以下方法:

为控制台消费者创建了一个配置文件,如下所示:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false

并尝试了两个版本的控制台消费者脚本:

#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config

都没有用 - 它只是坐在那里但不打印任何东西,即使消费者正在积极 consuming/saving 抵消。

我是不是漏掉了一些其他的 configuration/properties?

谢谢!

玛丽娜

如果您添加 --from-beginning 应该很可能会给您一些结果,至少当我自己尝试时是这样。或者,如果您不提供该参数但在您让消费者收听时阅读更多消息(并触发偏移量提交),那也应该在那里显示消息。

好的,我知道问题出在哪里了。我的 Kafka 实际上是使用 Zookeeper 作为偏移存储,而不是 Kafka ....我没有立即检测到的原因是因为我错误地检查了 ZK 内容:

我在做

ls  /consumers/consumer_group_name/offsets/topic_name/partition_number

那里什么也看不见。相反,我必须 'get' 内容 - 它确实为我的消费者显示了正确的偏移量,如下所示:

get /consumers/consumer_group_name/offsets/topic_name/partition_number 
185530404
cZxid = 0x70789ad05
ctime = Mon Nov 23 17:49:46 GMT 2015
mZxid = 0x7216cdc5c
mtime = Thu Dec 03 20:18:57 GMT 2015
pZxid = 0x70789ad05
cversion = 0
dataVersion = 3537384
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0

我在尝试使用 __consumer_offsets 主题时遇到了这个问题。 我设法为不同的 Kafka 版本弄清楚了,并认为我会分享我发现的东西

对于卡夫卡 0.8。2.x

注意:这里使用Zookeeper连接

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning

对于 Kafka 0.9.x.x 和 0.10.x.x

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

对于 0.11.x.x - 2.x

#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

从 Kafka 0.11 开始,the (Scala) source code can be found here

对于那些需要 Java 翻译的人,假设您从任何消费者进程中获得 ConsumerRecord<byte[], byte[]> consumerRecord,您可以使用

  1. 获取key,(先检查key是否不为null)然后使用GroupMetadataManager.readMessageKey(consumerRecord.key)。这可以 return 不同的类型,所以检查 if ( ... instanceof OffsetKey),然后转换它,你可以从中得到各种值。

  2. 获取offsets的Kafka记录值,可以使用String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))

从 Scala 代码翻译的最小 Java 示例...

byte[] key = consumerRecord.key;
if (key != null) {
    Object o = GroupMetadataManager.readMessageKey(key);
    if (o != null && o instanceOf OffsetKey) {
        OffsetKey offsetKey = (OffsetKey) o;
        Object groupTopicPartition = offsetKey.key;
        byte[] value = consumerRecord.value;
        String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
       // TODO: Print, store, or compute results with the new key and value 
    }
}

请注意,也可以使用 AdminClient API 来描述组而不是使用这些原始消息


Scala 源代码摘录

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
  Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
    // Only print if the message is an offset record.
    // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
    case offsetKey: OffsetKey =>
      val groupTopicPartition = offsetKey.key
      val value = consumerRecord.value
      val formattedValue =
        if (value == null) "NULL"
        else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
      output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
      output.write("::".getBytes(StandardCharsets.UTF_8))
      output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
      output.write("\n".getBytes(StandardCharsets.UTF_8))
    case _ => // no-op
  }

对于 Kafka-2.X 使用下面的命令

kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"