Race 暂停一组 goroutines

Race pausing a group of goroutines

我有一堆 goroutines 在循环中做一些事情。我希望能够暂停所有这些,运行 一些任意代码,然后恢复它们。我尝试这样做的方式可能不是惯用的(我希望有更好的解决方案),但我不明白为什么它不起作用。

精简到最基本的部分(底部的驱动程序代码):

type looper struct {
    pause  chan struct{}
    paused sync.WaitGroup
    resume chan struct{}
}

func (l *looper) loop() {
    for {
        select {
        case <-l.pause:
            l.paused.Done()
            <-l.resume
        default:
            dostuff()
        }
    }
}

func (l *looper) whilePaused(fn func()) {
    l.paused.Add(32)
    l.resume = make(chan struct{})
    close(l.pause)
    l.paused.Wait()
    fn()
    l.pause = make(chan struct{})
    close(l.resume)
}

我启动了 32 个 goroutines 运行ning loop(),然后连续调用 whilePaused 100 次,一切似乎都正常......但是如果我 运行它与 -race,它告诉我在 l.resume 上有一个在 whilePaused (l.resume = make(chan struct{})) 中写入它和在 loop ([=19] 中读取它之间的竞争=]).

我不明白为什么会这样。根据 The Go Memory Modelclose(l.pause) 应该在每个 loop goroutine 中的 <-l.pause 之前发生。这应该意味着 make(chan struct{}) 值在所有这些 loop goroutines 中作为 l.resume 的值可见,就像字符串 "hello world" 作为 a 在文档示例中的 f goroutine 中。


一些可能相关的附加信息:


这是我的其余代码,如果您想 运行 整个代码:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// looper code from above

var n int64    
func dostuff() {
    atomic.AddInt64(&n, 1)
}

func main() {
    l := &looper{
        pause: make(chan struct{}),
    }
    var init sync.WaitGroup
    init.Add(32)
    for i := 0; i < 32; i++ {
        go func() {
            init.Done()
            l.loop()
        }()
    }
    init.Wait()
    for i := 0; i < 100; i++ {
        l.whilePaused(func() { fmt.Printf("%d ", i) })
    }
    fmt.Printf("\n%d\n", atomic.LoadInt64(&n))
}

这是因为在线程执行 l.paused.Done() 之后,另一个线程能够绕过循环并再次分配 l.resume

这是操作顺序

Looper thread    |    Pauser thread
------------------------------------
l.paused.Done()  |   
                 |   l.paused.Wait()
                 |   l.pause = make(chan struct{})
                 |   round the loop
                 |   l.paused.Add(numThreads)
<- l.resume      |   l.resume = make(chan struct{})   !!!RACE!!