使用 Shopify Sarama 的 Kafka 错误处理

Kafka Error Handling using Shopify Sarama

所以我正在尝试将 Kafka 用于我的应用程序,它有一个生产者将操作记录到 Kafka MQ 中,消费者从 MQ.Since 中读取它,我的应用程序在 Go 中,我正在使用 Shopify Sarama使这成为可能。

现在,我可以读取 MQ 并使用

打印消息内容
fmt.Printf

但是,我真的希望错误处理比控制台打印更好,而且我愿意付出更多努力。

现在用于消费者连接的代码:

mqCfg := sarama.NewConfig()

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
    panic(err) // Don't want to panic when error occurs, instead handle it
}

以及消息的处理:

    go func() {
    defer wg.Done()
    for message := range consumer.Messages() {
        var msgContent Message
        _ = json.Unmarshal(message.Value, &msgContent)
        fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
    }
}()

我的问题(我是测试 Kafka 的新手,一般来说是 kafka 的新手):

  1. 上面的程序哪里会出错,我可以处理吗?任何示例代码对我来说都是很好的开始。我能想到的错误情况是 msgContent 在 JSON 中实际上不包含任何类型的 ContentId 字段。

  2. 在 kafka 中,是否存在消费者试图读取当前偏移量但由于某种原因无法读取的情况(即使 JSON 格式正确)?我的消费者是否有可能回溯到在失败的偏移读取之上 x 步并重新处理偏移?或者有更好的方法吗?再一次,这些情况会是什么?

我乐于阅读和尝试。

关于 1) 检查我在下面记录错误消息的位置。这或多或少是我会做的。

关于 2) 我不知道如何在主题中倒退。只需一遍又一遍地创建一个消费者,它的起始偏移量每次都减一,这是非常有可能的。但我不建议这样做,因为您很可能最终会一遍又一遍地重播相同的消息。我确实建议经常保存您的偏移量,以便在情况恶化时您可以恢复。

我相信下面的代码可以解决您的大部分问题。我没试过编译这个。而且 sarama api 最近一直在变化,所以 api 目前可能有点不同。

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {
    wg.Add(1)
    go func(){
        defer wg.Done()
        //to track the last known good offset we processed, which is 
        // updated after each successfully processed event. 
        saveprogress := func(off int64){
            //Save the offset somewhere...a file... 
            //Ive also used kafka to store progress 
            //using a special topic as a WAL
        }
        defer saveprogress(lastgoodoffset)

        client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())
        if err != nil {
            log.Error(err)
            return
        }
        defer client.Close()
        sarama.NewConsumerConfig()
        consumerConfig.OffsetMethod = sarama.OffsetMethodManual
        consumerConfig.OffsetValue = int64(lastgoodoff)
        consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)
        if err != nil {
            log.Error(err)
            return
        }
        defer consumer.Close()
        for {
            select {
            case event := <-consumer.Events():
                if event.Err != nil {
                    log.Error(event.Err)
                    return
                }
                msgContent := &Message{}
                err = json.Unmarshal(message.Value, msgContent)
                if err != nil {
                    log.Error(err)
                    continue //continue to skip this message or return to stop without updating the offset.
                }
                // Send the message on to be processed.
                out <- msgContent 

                lastgoodoff = event.Offset
            }
        }
    }()
}