如何在 Golang Kafka 10 中获取分区的消费者组偏移量

How to get consumer group offsets for partition in Golang Kafka 10

现在 Golang Kafka 库 (sarama) 在没有任何外部库帮助的情况下为 kafka 10 提供消费者组功能。我怎样才能在任何给定时间获得消费者组正在处理的当前消息偏移量?

之前我用过kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster),我不确定使用哪个API来获取我的消费者组消息偏移。

我也在与 Sarama 和 Kafka 合作来获得主题的偏移量。

您可以通过以下代码获取偏移量。

    package main

    import (
     "gopkg.in/Shopify/sarama"
     "fmt"
    )

    func main(){
      client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
      if err != nil {
          panic(err)
      }
      lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
      if err != nil {
          panic(err)
      }
      fmt.Println("Last Commited Offset ",lastoffset)
    }

让我知道这是否是您正在寻找的答案以及它是否有帮助。

在幕后 consumerGroupSession 结构使用 PartitionOffsetManagerget next offset:

    if pom := s.offsets.findPOM(topic, partition); pom != nil {
        offset, _ = pom.NextOffset()
    }

这是pom.NextOffset()的文档。

consumerGroupSession 通过 newConsumerGroupClaim() 方法构造 consumerGroupClaim 结构时,它传递由 pom.NextOffset() 返回的偏移量作为 offset 参数。您稍后可以通过 claim.InitialOffset() 访问它。开始消费消息后,您可以使用当前处理的消息的 message.Offset

不幸的是,无法从 ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) 方法访问 consumerGroupSession.offsets.findPOM(),因为它接收会话作为 ConsumerGroupSession 接口 ,而不是consumerGroupSession结构。所以 offsets 变量是私有的,不可访问。

因此我们无法真正访问 NextOffset() 方法,它正是 OP 想要的。

这是获取消费者组偏移量(即消费者组开始的偏移量)的示例代码:

package main
    
    import (
        "context"
        "log"
        "strings"
    
        "github.com/Shopify/sarama"
    )
    
    func main() {
        groupName := "testgrp"
        topic := "topic_name"
        offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
        if e != nil {
            log.Fatal(e)
        }
        log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
    }
    
    type gcInfo struct {
        offset int64
    }
    
    func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        g.offset = claim.InitialOffset()
        return nil
    }
    
    func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
        config := sarama.NewConfig()
        config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
        client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
        if err != nil {
            return 0, err
        }
        info := gcInfo{}
        if err := client.Consume(ctx, []string{topic}, &info); err != nil {
            return 0, err
        }
        return info.offset, nil
    }