固定数量工人模式的竞争条件
Race condition on fixed number of workers pattern
我出于学习目的正在玩一些代码,在使用 -race
标志时我在执行时遇到了竞争条件,我想了解原因。该代码启动了一组固定的 goroutine,这些 goroutine 充当工作人员从通道中消费任务,没有固定数量的任务,只要通道接收到任务,工作人员就必须继续工作。
我在调用 WaitGroup
函数时遇到竞争条件。据我了解(查看数据竞争报告),当第一个 wg.Add
调用由一个派生的 goroutine 执行并且主例程同时调用 wg.Wait
时,就会发生竞争情况。那是对的吗?如果是,这意味着我必须始终在主例程上执行对 Add 的调用以避免这种资源竞争?但是,这也意味着我需要知道工人需要提前处理多少任务,如果我需要代码处理一旦工人 运行 ...
代码:
func Test(t *testing.T) {
t.Run("", func(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func())
for i := 0; i < 5; i++ {
wID := i + 1
go func(workerID int) {
for task := range queuedTaskC {
wg.Add(1)
task()
}
}(wID)
}
taskFn := func() {
fmt.Println("executing task...")
wg.Done()
}
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
wg.Wait()
close(queuedTaskC)
fmt.Println(len(queuedTaskC))
})
}
报告:
==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
internal/race.Read()
/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/src/sync/waitgroup.go:71 +0x219
workerpool.Test.func1.1()
/workerpool/workerpool_test.go:36 +0x64
Previous write at 0x00c0001280d8 by goroutine 8:
internal/race.Write()
/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/src/sync/waitgroup.go:128 +0x126
workerpool.Test.func1()
/workerpool/workerpool_test.go:57 +0x292
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 11 (running) created at:
workerpool.Test.func1()
/workerpool/workerpool_test.go:34 +0xe4
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 8 (running) created at:
testing.(*T).Run()
/src/testing/testing.go:1168 +0x5bb
workerpool.Test()
workerpool_test.go:29 +0x4c
testing.tRunner()
/src/testing/testing.go:1123 +0x202
==================
WaitGroup
实现是基于由 Add
和 Done
方法更改的内部计数器。在计数器归零之前,Wait
方法不会 return。也可以重用 WaitGroup
但在文档中描述的某些条件下:
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
虽然您的代码没有重复使用 wg
,但它可以多次将 WaitGroup
计数器归零。当在给定时间没有任务正在处理时会发生这种情况,这在并发代码中是完全可能的。由于您的代码在调用 Add
之前不会等待 Wait
到 return,因此您会收到竞争条件错误。
正如大家在评论中建议的那样,您应该放弃使用 WaitGroup
跟踪任务的想法,转而控制 运行 goroutines。附上代码提案。
func Test(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func(), 10)
for i := 0; i < 5; i++ {
wID := i + 1
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range queuedTaskC {
task()
}
}(wID)
}
for i := 0; i < 10; i++ {
queuedTaskC <- func() {
fmt.Println("executing task...")
}
}
close(queuedTaskC)
wg.Wait()
fmt.Println(len(queuedTaskC))
}
我出于学习目的正在玩一些代码,在使用 -race
标志时我在执行时遇到了竞争条件,我想了解原因。该代码启动了一组固定的 goroutine,这些 goroutine 充当工作人员从通道中消费任务,没有固定数量的任务,只要通道接收到任务,工作人员就必须继续工作。
我在调用 WaitGroup
函数时遇到竞争条件。据我了解(查看数据竞争报告),当第一个 wg.Add
调用由一个派生的 goroutine 执行并且主例程同时调用 wg.Wait
时,就会发生竞争情况。那是对的吗?如果是,这意味着我必须始终在主例程上执行对 Add 的调用以避免这种资源竞争?但是,这也意味着我需要知道工人需要提前处理多少任务,如果我需要代码处理一旦工人 运行 ...
代码:
func Test(t *testing.T) {
t.Run("", func(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func())
for i := 0; i < 5; i++ {
wID := i + 1
go func(workerID int) {
for task := range queuedTaskC {
wg.Add(1)
task()
}
}(wID)
}
taskFn := func() {
fmt.Println("executing task...")
wg.Done()
}
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
wg.Wait()
close(queuedTaskC)
fmt.Println(len(queuedTaskC))
})
}
报告:
==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
internal/race.Read()
/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/src/sync/waitgroup.go:71 +0x219
workerpool.Test.func1.1()
/workerpool/workerpool_test.go:36 +0x64
Previous write at 0x00c0001280d8 by goroutine 8:
internal/race.Write()
/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/src/sync/waitgroup.go:128 +0x126
workerpool.Test.func1()
/workerpool/workerpool_test.go:57 +0x292
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 11 (running) created at:
workerpool.Test.func1()
/workerpool/workerpool_test.go:34 +0xe4
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 8 (running) created at:
testing.(*T).Run()
/src/testing/testing.go:1168 +0x5bb
workerpool.Test()
workerpool_test.go:29 +0x4c
testing.tRunner()
/src/testing/testing.go:1123 +0x202
==================
WaitGroup
实现是基于由 Add
和 Done
方法更改的内部计数器。在计数器归零之前,Wait
方法不会 return。也可以重用 WaitGroup
但在文档中描述的某些条件下:
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
虽然您的代码没有重复使用 wg
,但它可以多次将 WaitGroup
计数器归零。当在给定时间没有任务正在处理时会发生这种情况,这在并发代码中是完全可能的。由于您的代码在调用 Add
之前不会等待 Wait
到 return,因此您会收到竞争条件错误。
正如大家在评论中建议的那样,您应该放弃使用 WaitGroup
跟踪任务的想法,转而控制 运行 goroutines。附上代码提案。
func Test(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func(), 10)
for i := 0; i < 5; i++ {
wID := i + 1
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range queuedTaskC {
task()
}
}(wID)
}
for i := 0; i < 10; i++ {
queuedTaskC <- func() {
fmt.Println("executing task...")
}
}
close(queuedTaskC)
wg.Wait()
fmt.Println(len(queuedTaskC))
}