Sarama Kafka 消费者组函数 return

Sarama Kafka consumergroup function return

我是 Go Lang 的新手,正在尝试对使用 Sarama 库使用来自 Kafka 的消息的开源库进行一些调整。原代码可见here.

原始包实现了一个 PartitionConsumer,如果不需要多个消费者使用同一主题的读取一致性,它就可以正常工作,但是,这对我不起作用。

我在同一个应用程序中做了一些工作,使用我在网上找到的一些示例来实现 sarama NewConsumerGroup 包。

下面是我目前的代码 运行ning:

package main

import (
    "context"
    // "flag"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "encoding/json"
    "log"
    "strings"

    "github.com/Shopify/sarama"
    // "github.com/Shopify/sarama/mocks"
)

// KafkaInput is used for recieving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
    config    *KafkaConfig
    // consumers []sarama.PartitionConsumer
    messages  chan *sarama.ConsumerMessage
}

var (
    brokers  = ""
    version  = ""
    group    = ""
    topics   = ""
    assignor = ""
    oldest   = true
    verbose  = false
)

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}

// NewKafkaInput creates instance of kafka consumer client.
func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
    /**
     * Construct a new Sarama configuration.
     * The Kafka cluster version has to be defined before the consumer/producer is initialized.
     */
    c := sarama.NewConfig()
    // Configuration options go here

    log.Println("Starting a new Sarama consumer")

    if verbose {
        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
    }

    version, err := sarama.ParseKafkaVersion("2.1.1")
    if err != nil {
        log.Panicf("Error parsing Kafka version: %v", err)
    }

    c.Version = version

    if oldest {
        c.Consumer.Offsets.Initial = sarama.OffsetOldest
    }

    /**
     * Setup a new Sarama consumer group
     */
    consumer := Consumer{ready: make(chan bool)}

    ctx, cancel := context.WithCancel(context.Background())
    client, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)

    if err != nil {
        log.Panicf("Error creating consumer group client: %v", err)
    }

    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := client.Consume(ctx, []string{config.topic}, &consumer); err != nil {
                log.Panicf("Error from consumer: %v", err)
            }

            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                return
            }

            consumer.ready = make(chan bool)
        }

    }()

    <-consumer.ready // Await till the consumer has been set up
    log.Println("Sarama consumer up and running!...")

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        log.Panicf("Error closing client: %v", err)
    }

    i := &KafkaInput{
        config: config,
        // consumers: make([]sarama.PartitionConsumer, len(partitions)),
        // messages:  make(chan *sarama.ConsumerMessage, 256),
        messages: make(chan *sarama.ConsumerMessage, 256),
    }


    return i
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {

        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
    }

    return nil
}

// ErrorHandler should receive errors
func (i *KafkaInput) ErrorHandler(consumer sarama.PartitionConsumer) {
    for err := range consumer.Errors() {
        log.Println("Failed to read access log entry:", err)
    }
}

// Read Comment

func (i *KafkaInput) Read(data []byte) (int, error) {
    message := <-i.messages

    if !i.config.useJSON {
        copy(data, message.Value)
        return len(message.Value), nil
    }

    var kafkaMessage KafkaMessage
    json.Unmarshal(message.Value, &kafkaMessage)

    buf, err := kafkaMessage.Dump()
    if err != nil {
        log.Println("Failed to decode access log entry:", err)
        return 0, err
    }

    copy(data, buf)

    return len(buf), nil

}

func (i *KafkaInput) String() string {
    return "Kafka Input: " + i.config.host + "/" + i.config.topic
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

KafkaConfig携带了消费者的groupID和Topic。当我 运行 这个程序时,消费者启动并使用正确的组从正确的主题中读取并使用在此函数中创建的 ConsumerClaim 将其打印到 STDOUT:

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
    }

    return nil
}

但是我认为我需要的是 NewKafkaInput 函数到 return *KafkaInput 并将声明中的消息添加到结构中(如果我使用错误,请原谅我这里的术语,这是我的第一个围棋牛仔竞技表演)。

... 
i := &KafkaInput{
        config: config,
        // consumers: make([]sarama.PartitionConsumer, len(partitions)),
        // messages:  make(chan *sarama.ConsumerMessage, 256),
        messages: make(chan *sarama.ConsumerMessage, 256),
    }


    return i
}

在此处完成的原始示例中:

func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {

    ...

    go func(consumer sarama.PartitionConsumer) {
                defer consumer.Close()

                for message := range consumer.Messages() {
                    i.messages <- message
                }
            }(consumer)

    ...

}

我花了几天时间研究将函数移入和移出 NewKafakInput 函数,尝试将消息添加到函数外部的 KafakInput 结构以及两者之间的所有内容。我就是无法让它工作。 NewKafakInput 函数需要 return 带有任何消息的 *KafkaInput 以便此函数可以完成:

func (i *KafkaInput) Read(data []byte) (int, error) {
    message := <-i.messages

    if !i.config.useJSON {
        copy(data, message.Value)
        return len(message.Value), nil
    }

    var kafkaMessage KafkaMessage
    json.Unmarshal(message.Value, &kafkaMessage)

    buf, err := kafkaMessage.Dump()
    if err != nil {
        log.Println("Failed to decode access log entry:", err)
        return 0, err
    }

    copy(data, buf)

    return len(buf), nil

}

完全有可能我也把这件事搞得一团糟,但我们不胜感激任何帮助和意见。

谢谢

这是我的问题的解决方案。我有 goroutines 阻塞了主要功能,他们需要被打破。如果下面的代码没有任何意义,这里是我正在修改的程序的 link:https://github.com/buger/goreplay。如果我能得到所有者的回复,我计划清理代码并提交拉取请求,或者可能发布一个分支。

package main

import (
    "context"
    "encoding/json"
    "strings"

    "os"

    "log"

    "github.com/Shopify/sarama"
)

// KafkaInput is used for recieving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
    sarama.ConsumerGroup
    config   *KafkaConfig
    consumer Consumer
    messages chan *sarama.ConsumerMessage
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready    chan bool
    messages chan *sarama.ConsumerMessage
}

var (
    brokers  = ""
    version  = ""
    group    = ""
    topics   = ""
    assignor = ""
    oldest   = true
    verbose  = false
)

// NewKafkaInput creates instance of kafka consumer client.
func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
    /**
     * Construct a new Sarama configuration.
     * The Kafka cluster version has to be defined before the consumer/producer is initialized.
     */
    c := sarama.NewConfig()
    // Configuration options go here

    log.Printf("KafkaConfig: %s", config.host)
    log.Printf("KafkaConfig: %s", config.group)
    log.Printf("KafkaConfig: %s", config.topic)

    log.Println("Starting a new Sarama consumer")

    if verbose {
        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
    }

    version, err := sarama.ParseKafkaVersion("2.1.1")
    if err != nil {
        log.Panicf("Error parsing Kafka version: %v", err)
    }

    c.Version = version

    if oldest {
        c.Consumer.Offsets.Initial = sarama.OffsetOldest
    }

    group, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)

    /**
     * Setup a new Sarama consumer group
     */
    consumer := Consumer{
        ready:    make(chan bool),
        messages: make(chan *sarama.ConsumerMessage, 256),
    }

    i := &KafkaInput{
        ConsumerGroup: group,
        config:        config,
        messages:      make(chan *sarama.ConsumerMessage, 256),
        consumer:      consumer,
    }

    go i.loop([]string{config.topic})
    i.messages = consumer.messages
    return i
}

//ConsumeClaim and stuff
func (i *KafkaInput) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {
    for msg := range c.Messages() {
        s.MarkMessage(msg, "")
        i.Push(msg)
    }
    return nil
}

func (i *KafkaInput) loop(topic []string) {
    ctx := context.Background()
    for {
        if err := i.Consume(ctx, []string{i.config.topic}, i); err != nil {
            return
        }
    }
}

// Push Messages
func (i *KafkaInput) Push(m *sarama.ConsumerMessage) {
    if i.consumer.messages != nil {
        log.Printf("MSGPUSH: %s", m)
        i.consumer.messages <- m
    }
}

func (i *KafkaInput) Read(data []byte) (int, error) {

    message := <-i.messages
    log.Printf("Msg: %s", string(message.Value))
    if !i.config.useJSON {
        copy(data, message.Value)
        return len(message.Value), nil
    }

    var kafkaMessage KafkaMessage
    json.Unmarshal(message.Value, &kafkaMessage)

    buf, err := kafkaMessage.Dump()
    if err != nil {
        log.Println("Failed to decode access log entry:", err)
        return 0, err
    }

    copy(data, buf)

    return len(buf), nil

}

func (i *KafkaInput) String() string {
    return "Kafka Input: " + i.config.host + "/" + i.config.topic
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (i *KafkaInput) Setup(s sarama.ConsumerGroupSession) error {
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (i *KafkaInput) Cleanup(s sarama.ConsumerGroupSession) error {
    return nil
}