sync.WaitGroup 的协程在最后一个 wg.Done() 之前结束

Goroutines with sync.WaitGroup end before last wg.Done()

我有一个示例代码(您可以在 Go Playground 上找到它):

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    messages := make(chan int)
    var wg sync.WaitGroup
    var result []int

    // you can also add these one at 
    // a time if you need to 

    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 1
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 2
    }() 
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 3
    }()
    go func() {
        for i := range messages {
            fmt.Println(i)
        result = append(result, i)
        }

    }()

    wg.Wait()
    fmt.Println(result)
}

我得到了这个输出:

2
1
[2 1]

我想我知道为什么会这样,但我无法解决它。 WaitGroup 中有 3 个项目,我的意思是三个 goroutine,第 4 个 groutine 使用通道中的数据。当最后一个 goroutine 说 wg.Done() 程序结束时,因为 wg.Wait() 说每个 goroutine 都完成了,最后一个 goroutine 结果第四个 goroutine 不能消费,因为程序结束了。我尝试在第 4 个函数中使用 wg.Add(1) 和 wg.Done() 添加加一,但在这种情况下我遇到了死锁。

你产生的最后一个 goroutine —— 那个打算收集结果的 goroutine —— 没有被 main() 等待,所以 wg.Wait() 在 returns,main() 退出并收获剩下的 goroutines。 据说到那时只有一个收集 goroutine 仍然存在,但它无法更新切片。

另请注意,由于相同的原因,您的程序中存在数据竞争:当 main() 读取结果片段时,它 不知道 读取它是否安全——也就是说,作者是否已经写完了。

一个简单的解决方法是为该 goroutine 添加 wg.Add(1) 并在其中添加 defer wg.Done()

更好的解决方案是 close() wg.Wait() 之后的 messages 频道 在从切片中读取之前。这将使收集 goroutine 的 range 循环终止,这也会在该 goroutine 和 main().

之间创建一个适当的同步点

关闭通道是一种惯用的 Go 信号模式,如果关闭缓冲通道,消费者可以读取所有排队的数据然后停止。

这段代码可以正常工作:

func main() {
    messages := make(chan int)
    var wg sync.WaitGroup
    var result []int

    // you can also add these one at
    // a time if you need to

    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 1
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 2
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second * 1)
        messages <- 3
    }()

    // this goroutine added to signal end of data stream
    // by closing messages channel
    go func() {
        wg.Wait()
        close(messages)
    }()

    // if you need this to happen inside a go routine,
    // this channel is used for signalling end of the work,
    // also another sync.WaitGroup could be used, but for just one
    // goroutine, a single channel as a signal makes sense (there is no
    // groups)
    done := make(chan struct{})
    go func() {
        defer close(done)
        for i := range messages {
            fmt.Println(i)
            result = append(result, i)
        }
    }()

    <-done
    fmt.Println(result)
}

如您所见,我们刚刚添加了另一个 goroutine,当所有生产者都完成后,它会关闭 messages 通道。

kostix 的回答是正确的,直到他们提到

An easy fix is to do add wg.Add(1) for that goroutine and defer wg.Done() in it, too.

如果不关闭消息通道,那将导致您的循环永远无法完成!所以主 goroutine 会在你最后一个 "collecting" goroutine 完成之前再次完成。如果将 goroutine 绑定到永远不会发送 Done() 信号的 wg WaitGroup,您也会收到错误消息。

然后当他们提到

A better solution is to close() the messages channel after wg.Wait() and before reading from the slice

他们建议的放置将再次给您同样的错误,因为您将在同一个 WaitGroup wg 上等待。虽然您的最后一个 "collecting" goroutine 将继续在您的 messages 频道中寻找更多消息,并且永远不会到达延迟的 wg.Done()

然后 Alex Yu 的评论通过在完全阅读结果之前等待来修复它,这是一个很好的修复。但是如果你想让你的收集 goroutine 立即开始而不是等待所有以前的 goroutines(写入 messages 通道)在它开始从所述通道读取之前完成,我会建议以下......

创建结果 WaitGroup,Add(1) 在开始你的最后一个 "collecting" goroutine 之前,defer wgResult.Done() 在你最后一个 "collecting" goroutine 中,然后在最后,在你的 wg.Wait() 和你的 fmt.Println(result),你应该 close(messages)wgResult.Wait().

这允许您所有的 go 例程尽快启动,并且仅在需要时等待写入 goroutines 和读取 goroutines。

这是带有建议解决方案的 GoPlayground link

https://play.golang.org/p/na0JS1HTwNP