从 Sarama 的错误通道中读取的正确方法是什么?

What is the proper way to read from the errors channel in Sarama?

我正在使用用 Go 编写的 Sarama 库在生成消息时从错误通道读取。整体代码如下所示,包含在一个函数中:

producer.AsyncProducer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.ByteEncoder(message)}
go func() {
    for err := range saramaProducer.Errors() {
        if producer.callbacks.OnError != nil {
            producer.callbacks.OnError(err)
        }
    }
}()

根据我对 go 例程的理解,我的 go 例程会不断迭代 Errors() 通道,直到它收到一个通道。有没有办法让它在我的函数执行完毕后停止侦听错误?

您可以使用另一个频道和一个 select 来制作循环 return。

var quit chan struct{}
go func() {
    for {
        select {
        case err:=<-saramaProducer.Errors():
            //handle errors
        case <-quit:
            return
        }
    }
}
defer func() { quit<-struct{}{} }()

最初的 for ... range 循环在获得频道之前不会迭代频道。相反,它会阻塞直到收到错误、处理它并再次等待新错误,直到通道关闭或 main returns.

上面的代码有个小问题,就是当quit和error channel都准备好了的时候,select会随机抽取一个,这样可能会造成单个错误丢失。如果这值得处理,只需将另一个 switchdefault 放在一起即可得到该错误,然后 return.