从 Sarama 的错误通道中读取的正确方法是什么?
What is the proper way to read from the errors channel in Sarama?
我正在使用用 Go 编写的 Sarama 库在生成消息时从错误通道读取。整体代码如下所示,包含在一个函数中:
producer.AsyncProducer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.ByteEncoder(message)}
go func() {
for err := range saramaProducer.Errors() {
if producer.callbacks.OnError != nil {
producer.callbacks.OnError(err)
}
}
}()
根据我对 go 例程的理解,我的 go 例程会不断迭代 Errors()
通道,直到它收到一个通道。有没有办法让它在我的函数执行完毕后停止侦听错误?
您可以使用另一个频道和一个 select
来制作循环 return。
var quit chan struct{}
go func() {
for {
select {
case err:=<-saramaProducer.Errors():
//handle errors
case <-quit:
return
}
}
}
defer func() { quit<-struct{}{} }()
最初的 for ... range
循环在获得频道之前不会迭代频道。相反,它会阻塞直到收到错误、处理它并再次等待新错误,直到通道关闭或 main
returns.
上面的代码有个小问题,就是当quit
和error channel都准备好了的时候,select
会随机抽取一个,这样可能会造成单个错误丢失。如果这值得处理,只需将另一个 switch
与 default
放在一起即可得到该错误,然后 return
.
我正在使用用 Go 编写的 Sarama 库在生成消息时从错误通道读取。整体代码如下所示,包含在一个函数中:
producer.AsyncProducer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.ByteEncoder(message)}
go func() {
for err := range saramaProducer.Errors() {
if producer.callbacks.OnError != nil {
producer.callbacks.OnError(err)
}
}
}()
根据我对 go 例程的理解,我的 go 例程会不断迭代 Errors()
通道,直到它收到一个通道。有没有办法让它在我的函数执行完毕后停止侦听错误?
您可以使用另一个频道和一个 select
来制作循环 return。
var quit chan struct{}
go func() {
for {
select {
case err:=<-saramaProducer.Errors():
//handle errors
case <-quit:
return
}
}
}
defer func() { quit<-struct{}{} }()
最初的 for ... range
循环在获得频道之前不会迭代频道。相反,它会阻塞直到收到错误、处理它并再次等待新错误,直到通道关闭或 main
returns.
上面的代码有个小问题,就是当quit
和error channel都准备好了的时候,select
会随机抽取一个,这样可能会造成单个错误丢失。如果这值得处理,只需将另一个 switch
与 default
放在一起即可得到该错误,然后 return
.