关闭缓冲通道时是否应该排空它

Should one drain a buffered channel when closing it

在 Go 中给定一个(部分)填充的缓冲通道

ch := make(chan *MassiveStruct, n)
for i := 0; i < n; i++ {
    ch <- NewMassiveStruct()
}

是否建议在关闭频道时(由作者)也排空频道,以防不知道读者何时从中读取(例如,数量有限,他们目前很忙)?即

close(ch)
for range ch {}

如果通道上有其他并发读者,这样的循环是否保证结束?

Context:具有固定数量工作人员的队列服务,它应该在服务停止时停止处理任何排队的东西(但不一定在之后立即进行 GC)。所以我关闭是为了向工作人员表明服务正在终止。我可以立即耗尽剩余的 "queue" 让 GC 释放分配的资源,我可以读取并忽略工作人员中的值,我可以按 运行 离开阅读器并将频道设置为作者中的 nil 以便 GC 清理所有内容。我不确定哪种方式最干净。

这取决于你的程序,但一般来说我会倾向于说不(你不需要在关闭频道之前清除它):如果你关闭它时你的频道中有项目,任何reader 仍然从频道读取将收到项目,直到频道为空。

这是一个例子:

package main

import (
    "sync"
    "time"
)

func main() {

    var ch = make(chan int, 5)
    var wg sync.WaitGroup
    wg.Add(1)

    for range make([]struct{}, 2) {
        go func() {
            for i := range ch {
                wg.Wait()
                println(i)
            }
        }()
    }

    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)

    wg.Done()
    time.Sleep(1 * time.Second)
}

这里,程序将输出所有项目,尽管通道在任何 reader 甚至可以从通道读取之前就已严格关闭。

有更好的方法可以实现您想要实现的目标。您当前的方法只会导致丢弃一些记录,并随机处理其他记录(因为耗尽循环正在争夺所有消费者)。这并没有真正解决目标。

你要的是取消。这是来自 Go Concurrency Patterns: Pipelines and cancellation

的示例
func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

你将一个 done 通道传递给所有的 goroutines,当你希望它们都停止处理时你关闭它。如果你经常这样做,你可能会发现 golang.org/x/net/context 包很有用,它使这种模式形式化,并添加了一些额外的特性(比如超时)。

我觉得除了 既不需要排水也不需要关闭 的提示外,我觉得所提供的答案实际上并没有说明太多。因此,针对所描述上下文的以下解决方案对我来说看起来很干净,它终止了工作人员并删除了对他们或相关频道的所有引用,因此,让 GC 清理频道及其内容:

type worker struct {
    submitted chan Task
    stop      chan bool
    p         *Processor
}

// executed in a goroutine
func (w *worker) run() {
    for {
        select {
        case task := <-w.submitted:
            if err := task.Execute(w.p); err != nil {
                logger.Error(err.Error())
            }
        case <-w.stop:
            logger.Warn("Worker stopped")
            return
        }
    }
}

func (p *Processor) Stop() {
    if atomic.CompareAndSwapInt32(&p.status, running, stopped) {
        for _, w := range p.workers {
            w.stop <- true
        }
        // GC all workers as soon as goroutines stop
        p.workers = nil
        // GC all published data when workers terminate
        p.submitted = nil
        // no need to do the following above:
        // close(p.submitted)
        // for range p.submitted {}
    }
}