了解 goroutines 同步

Understanding goroutines synchronization

我正在尝试了解 golang channelssynchronization。 当我 运行 我的程序使用 race detector 时,它会导致竞态检测。

我的程序:

func main() {
    ch := make(chan int)
    done := make(chan struct{})
    wg := sync.WaitGroup{}

    go func() {
        defer close(ch)
        defer close(done)
        wg.Wait()
        done <- struct{}{}
    }()

    for i := 0; i < 5; i++ {
        x := i
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println("Value: ", x)
            ch <- x
        }()
    }
    
loop:
    for {
        select {
        case i := <-ch:
            fmt.Println("Value: ", i)
        case <- done:
            break loop
        }
    }
}

竞争检测器报告:

==================
WARNING: DATA RACE
Write at 0x00c000020148 by goroutine 7:
  internal/race.Write()
      /home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/sync/waitgroup.go:128 +0x126
  main.main.func1()
      /home/reddy/code/github.com/awesomeProject/prod.go:106 +0xc4

Previous read at 0x00c000020148 by main goroutine:
  internal/race.Read()
      /home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/sync/waitgroup.go:71 +0x219
  main.main()
      /home/reddy/code/github.com/awesomeProject/prod.go:112 +0x124

Goroutine 7 (running) created at:
  main.main()
      /home/reddy/code/github.com/awesomeProject/prod.go:103 +0x104
==================

我不知道这里出了什么问题。

我的分析:

  1. wg.Add(1) 正在递增计数器
  2. wg.Done() 在 goroutine 的末尾调用,它递减计数器
  3. ch <- x 这应该是一个阻塞调用,因为它是非缓冲通道
  4. loop 应该迭代直到 done 通道有一些消息,当 waitgroup 计数器归零时,即所有 5 个 goroutine 都发布了消息
  5. 一旦计数器归零,wg goroutine 将恢复并调用 done,一旦消息在主循环中被消耗,它就会中断循环并正常退出。

程序在对 wg.Add and the call to wg.Wait 的调用之间存在竞争。这些调用可以以任何顺序发生。在调用 wg.Add.

之前调用 wg.Wait 时,对 wg.Wait 的调用不会等待任何 goroutines

通过在 启动调用 wg.Wait 的 goroutine 之前将调用移动到 wg.Add 来修复。此更改可确保对 wg.Add 的调用发生在对 wg.Wait.

的调用之前
for i := 0; i < 5; i++ {
    x := i
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Value: ", x)
        ch <- x
    }()
}

go func() {
    defer close(ch)
    defer close(done)
    wg.Wait()
    done <- struct{}{}
}()

WaitGroup 类型具有在 运行 在竞争检测器 (modeled read, modeled write) 下检查此错误的代码。

ch 关闭时,通过跳出主 goroutine 中的循环来简化代码。不需要 done 频道。

ch := make(chan int)
wg := sync.WaitGroup{}

for i := 0; i < 5; i++ {
    x := i
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println("Value: ", x)
        ch <- x
    }()
}

go func() {
    wg.Wait()
    close(ch)
}()

for i := range ch {
    fmt.Println("Value: ", i)
}