关闭未知长度的通道

Closing channel of unknown length

我无法关闭频道,因为不知道它
长度

package main

import (
    "fmt"
    "time"
)

func gen(ch chan int) {
    var i int
    for {
        time.Sleep(time.Millisecond * 10)
        ch <- i
        i++
        // when no more data (e.g. from db, or event stream)
        if i > 100 {
            break
        }
    }

    // hot to close it properly?
    close(ch)
}

func receiver(ch chan int) {
    for i := range ch {
        fmt.Println("received:", i)
    }
}

func main() {
    ch := make(chan int)

    for i := 0; i < 10; i++ {
        go gen(ch)
    }

    receiver(ch)
}

它给我错误

panic: send on closed channel

goroutine 8 [running]:
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd

goroutine 1 [panicwait]:
runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1)
    /usr/lib/go/src/runtime/proc.go:185 +0x163
runtime.main()
    /usr/lib/go/src/runtime/proc.go:121 +0x2f4
runtime.goexit()
    /usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1

goroutine 6 [sleep]:
time.Sleep(0x989680)
    /usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79

goroutine 7 [sleep]:
time.Sleep(0x989680)
    /usr/lib/go/src/runtime/time.go:59 +0xf9
main.gen(0xc82001a0c0)
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29
created by main.main
    /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b
exit status 2

这是合乎逻辑的 - 第一个 goroutine 在第二个 goroutine 尝试发送给它时关闭通道。在这种情况下关闭通道的最佳方法是什么?

通道关闭后,您将无法在其上发送更多值,否则它会出现混乱。这就是你的经历。

这是因为您启动了多个使用相同通道的 goroutines,它们在该通道上发送值。然后您关闭每个频道中的频道。并且由于它们不同步,一旦第一个 goroutine 到达它关闭它的位置,其他 goroutine 可能(并且他们将)继续向其发送值:panic!

您只能关闭一次频道(尝试关闭已经关闭的频道也会出现恐慌)。当所有发送值的 goroutines 都完成时,你应该这样做。为此,您需要检测所有发送者 goroutine 何时完成。一种惯用的检测方法是使用 sync.WaitGroup.

对于每个启动的发送者 goroutine,我们使用 WaitGroup.Add(). And each goroutine that is done sending the values can signal this by calling WaitGroup.Done() 将 1 添加到 WaitGroup。最好将此作为延迟语句执行,因此如果您的 goroutine 突然终止(例如恐慌),WaitGroup.Done() 仍将被调用,并且不会让其他 goroutines 挂起(等待赦免 - "missing" WaitGroup.Done() 永远不会来的电话...)。

并且 WaitGroup.Wait() 会等到所有发送者 goroutine 都完成,并且只有在这之后并且只有一次才会关闭通道。我们想检测这个 "global" 完成事件并在处理发送到它的值时关闭通道,所以我们必须在它自己的 goroutine 中执行此操作。

接收器 goroutine 将 运行 直到通道关闭,因为我们在通道上使用了 for ... range 构造。并且由于它 运行s 在主 goroutine 中,程序不会退出,直到所有值都从通道正确接收和处理。 for ... range 构造循环,直到接收到在通道关闭之前发送的所有值。

请注意,下面的解决方案无需修改也适用于缓冲和非缓冲通道(尝试使用带有 ch := make(chan int, 100) 的缓冲通道)。

正确的解决方案(在 Go Playground 上尝试):

func gen(ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    var i int
    for {
        time.Sleep(time.Millisecond * 10)
        ch <- i
        i++
        // when no more data (e.g. from db, or event stream)
        if i > 100 {
            break
        }
    }
}

func receiver(ch chan int) {
    for i := range ch {
        fmt.Println("received:", i)
    }
}

func main() {
    ch := make(chan int)
    wg := &sync.WaitGroup{}

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go gen(ch, wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    receiver(ch)
}

注:

请注意,重要的是 receiver(ch) 运行 在主 goroutine 中,代码等待 WaitGroup 并在其自身(非主)中关闭通道协程;而不是相反。如果您切换这 2 个,可能会导致 "early exit",即并非所有值都可以从通道接收和处理。这是因为 Go 程序在主 goroutine 完成时退出(规范:Program execution)。它不会等待其他(非主)goroutines 完成。因此,如果等待和关闭通道将在主 goroutine 中进行,则在关闭通道后程序可以随时退出,而不是等待另一个 goroutine 在这种情况下会循环接收来自通道的值。

"使用 Go 通道的一个一般原则是不要从接收方关闭通道,如果通道有多个并发发送者则不要关闭通道。"

每个频道一旦被标记为清理,最终都会被 GC 处理,所以可以不关闭频道,唯一的区别是该频道将在 gc 之后可用如果没有明确关闭,可能会出现循环。

不过,如果你能关闭频道总是好的。请通过以下链接进行详细说明。

文章 this and this 展示了在 1:N、N:1 或 M:N(发送者:接收者)

情况下关闭频道的各种方法