固定数量工人模式的竞争条件

Race condition on fixed number of workers pattern

我出于学习目的正在玩一些代码,在使用 -race 标志时我在执行时遇到了竞争条件,我想了解原因。该代码启动了一组固定的 goroutine,这些 goroutine 充当工作人员从通道中消费任务,没有固定数量的任务,只要通道接收到任务,工作人员就必须继续工作。

我在调用 WaitGroup 函数时遇到竞争条件。据我了解(查看数据竞争报告),当第一个 wg.Add 调用由一个派生的 goroutine 执行并且主例程同时调用 wg.Wait 时,就会发生竞争情况。那是对的吗?如果是,这意味着我必须始终在主例程上执行对 Add 的调用以避免这种资源竞争?但是,这也意味着我需要知道工人需要提前处理多少任务,如果我需要代码处理一旦工人 运行 ...

代码:

func Test(t *testing.T) {
    t.Run("", func(t *testing.T) {
        var wg sync.WaitGroup
        queuedTaskC := make(chan func())
        for i := 0; i < 5; i++ {
            wID := i + 1
            go func(workerID int) {
                for task := range queuedTaskC {
                    wg.Add(1)
                    task()
                }
            }(wID)
        }

        taskFn := func() {
            fmt.Println("executing task...")
            wg.Done()
        }
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn

        wg.Wait()
        close(queuedTaskC)

        fmt.Println(len(queuedTaskC))
    })
}

报告:

==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
  internal/race.Read()
      /src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /src/sync/waitgroup.go:71 +0x219
  workerpool.Test.func1.1()
      /workerpool/workerpool_test.go:36 +0x64

Previous write at 0x00c0001280d8 by goroutine 8:
  internal/race.Write()
      /src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /src/sync/waitgroup.go:128 +0x126
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:57 +0x292
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 11 (running) created at:
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:34 +0xe4
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /src/testing/testing.go:1168 +0x5bb
  workerpool.Test()
      workerpool_test.go:29 +0x4c
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202
==================

WaitGroup 实现是基于由 AddDone 方法更改的内部计数器。在计数器归零之前,Wait 方法不会 return。也可以重用 WaitGroup 但在文档中描述的某些条件下:

// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.

虽然您的代码没有重复使用 wg,但它可以多次将 WaitGroup 计数器归零。当在给定时间没有任务正在处理时会发生这种情况,这在并发代码中是完全可能的。由于您的代码在调用 Add 之前不会等待 Wait 到 return,因此您会收到竞争条件错误。

正如大家在评论中建议的那样,您应该放弃使用 WaitGroup 跟踪任务的想法,转而控制 运行 goroutines。附上代码提案。

func Test(t *testing.T) {
    var wg sync.WaitGroup
    queuedTaskC := make(chan func(), 10)
    for i := 0; i < 5; i++ {
        wID := i + 1
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range queuedTaskC {
                task()
            }
        }(wID)
    }
    for i := 0; i < 10; i++ {
        queuedTaskC <- func() {
            fmt.Println("executing task...")
        }
    }
    close(queuedTaskC)
    wg.Wait()
    fmt.Println(len(queuedTaskC))
}