关闭缓冲通道时是否应该排空它
Should one drain a buffered channel when closing it
在 Go 中给定一个(部分)填充的缓冲通道
ch := make(chan *MassiveStruct, n)
for i := 0; i < n; i++ {
ch <- NewMassiveStruct()
}
是否建议在关闭频道时(由作者)也排空频道,以防不知道读者何时从中读取(例如,数量有限,他们目前很忙)?即
close(ch)
for range ch {}
如果通道上有其他并发读者,这样的循环是否保证结束?
Context:具有固定数量工作人员的队列服务,它应该在服务停止时停止处理任何排队的东西(但不一定在之后立即进行 GC)。所以我关闭是为了向工作人员表明服务正在终止。我可以立即耗尽剩余的 "queue" 让 GC 释放分配的资源,我可以读取并忽略工作人员中的值,我可以按 运行 离开阅读器并将频道设置为作者中的 nil 以便 GC 清理所有内容。我不确定哪种方式最干净。
这取决于你的程序,但一般来说我会倾向于说不(你不需要在关闭频道之前清除它):如果你关闭它时你的频道中有项目,任何reader 仍然从频道读取将收到项目,直到频道为空。
这是一个例子:
package main
import (
"sync"
"time"
)
func main() {
var ch = make(chan int, 5)
var wg sync.WaitGroup
wg.Add(1)
for range make([]struct{}, 2) {
go func() {
for i := range ch {
wg.Wait()
println(i)
}
}()
}
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
wg.Done()
time.Sleep(1 * time.Second)
}
这里,程序将输出所有项目,尽管通道在任何 reader 甚至可以从通道读取之前就已严格关闭。
有更好的方法可以实现您想要实现的目标。您当前的方法只会导致丢弃一些记录,并随机处理其他记录(因为耗尽循环正在争夺所有消费者)。这并没有真正解决目标。
你要的是取消。这是来自 Go Concurrency Patterns: Pipelines and cancellation
的示例
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
你将一个 done
通道传递给所有的 goroutines,当你希望它们都停止处理时你关闭它。如果你经常这样做,你可能会发现 golang.org/x/net/context
包很有用,它使这种模式形式化,并添加了一些额外的特性(比如超时)。
我觉得除了 既不需要排水也不需要关闭 的提示外,我觉得所提供的答案实际上并没有说明太多。因此,针对所描述上下文的以下解决方案对我来说看起来很干净,它终止了工作人员并删除了对他们或相关频道的所有引用,因此,让 GC 清理频道及其内容:
type worker struct {
submitted chan Task
stop chan bool
p *Processor
}
// executed in a goroutine
func (w *worker) run() {
for {
select {
case task := <-w.submitted:
if err := task.Execute(w.p); err != nil {
logger.Error(err.Error())
}
case <-w.stop:
logger.Warn("Worker stopped")
return
}
}
}
func (p *Processor) Stop() {
if atomic.CompareAndSwapInt32(&p.status, running, stopped) {
for _, w := range p.workers {
w.stop <- true
}
// GC all workers as soon as goroutines stop
p.workers = nil
// GC all published data when workers terminate
p.submitted = nil
// no need to do the following above:
// close(p.submitted)
// for range p.submitted {}
}
}
在 Go 中给定一个(部分)填充的缓冲通道
ch := make(chan *MassiveStruct, n)
for i := 0; i < n; i++ {
ch <- NewMassiveStruct()
}
是否建议在关闭频道时(由作者)也排空频道,以防不知道读者何时从中读取(例如,数量有限,他们目前很忙)?即
close(ch)
for range ch {}
如果通道上有其他并发读者,这样的循环是否保证结束?
Context:具有固定数量工作人员的队列服务,它应该在服务停止时停止处理任何排队的东西(但不一定在之后立即进行 GC)。所以我关闭是为了向工作人员表明服务正在终止。我可以立即耗尽剩余的 "queue" 让 GC 释放分配的资源,我可以读取并忽略工作人员中的值,我可以按 运行 离开阅读器并将频道设置为作者中的 nil 以便 GC 清理所有内容。我不确定哪种方式最干净。
这取决于你的程序,但一般来说我会倾向于说不(你不需要在关闭频道之前清除它):如果你关闭它时你的频道中有项目,任何reader 仍然从频道读取将收到项目,直到频道为空。
这是一个例子:
package main
import (
"sync"
"time"
)
func main() {
var ch = make(chan int, 5)
var wg sync.WaitGroup
wg.Add(1)
for range make([]struct{}, 2) {
go func() {
for i := range ch {
wg.Wait()
println(i)
}
}()
}
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
wg.Done()
time.Sleep(1 * time.Second)
}
这里,程序将输出所有项目,尽管通道在任何 reader 甚至可以从通道读取之前就已严格关闭。
有更好的方法可以实现您想要实现的目标。您当前的方法只会导致丢弃一些记录,并随机处理其他记录(因为耗尽循环正在争夺所有消费者)。这并没有真正解决目标。
你要的是取消。这是来自 Go Concurrency Patterns: Pipelines and cancellation
的示例func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
你将一个 done
通道传递给所有的 goroutines,当你希望它们都停止处理时你关闭它。如果你经常这样做,你可能会发现 golang.org/x/net/context
包很有用,它使这种模式形式化,并添加了一些额外的特性(比如超时)。
我觉得除了 既不需要排水也不需要关闭 的提示外,我觉得所提供的答案实际上并没有说明太多。因此,针对所描述上下文的以下解决方案对我来说看起来很干净,它终止了工作人员并删除了对他们或相关频道的所有引用,因此,让 GC 清理频道及其内容:
type worker struct {
submitted chan Task
stop chan bool
p *Processor
}
// executed in a goroutine
func (w *worker) run() {
for {
select {
case task := <-w.submitted:
if err := task.Execute(w.p); err != nil {
logger.Error(err.Error())
}
case <-w.stop:
logger.Warn("Worker stopped")
return
}
}
}
func (p *Processor) Stop() {
if atomic.CompareAndSwapInt32(&p.status, running, stopped) {
for _, w := range p.workers {
w.stop <- true
}
// GC all workers as soon as goroutines stop
p.workers = nil
// GC all published data when workers terminate
p.submitted = nil
// no need to do the following above:
// close(p.submitted)
// for range p.submitted {}
}
}