X 个 goroutines 来更新同一个变量

X number of goroutines to update the same variable

我想让 X 数量的 goroutines 使用并行更新 CountValue(numRoutines 与你有多少 CPU 一样多)。

解决方案 1:

func count(numRoutines int) (countValue int) {
    var mu sync.Mutex
    k := func(i int) {
        mu.Lock()
        defer mu.Unlock()
        countValue += 5
    }
    for i := 0; i < numRoutines; i++ {
        go k(i)
    }

它变成了数据竞争,返回的countValue = 0

解决方案 2:

func count(numRoutines int) (countValue int) {
    k := func(i int, c chan int) {
        c <- 5
    }
    c := make(chan int)
    for i := 0; i < numRoutines; i++ {
        go k(i, c)
    }

    for i := 0; i < numRoutines; i++ {
        countValue += <- c
    }
    return
}

我对它进行了基准测试,顺序加法比使用 goroutines 更快。我认为这是因为我在这里有两个 for 循环,因为当我将 countValue += <- c 放在第一个 for 循环中时,代码运行得更快。

解决方案 3:

func count(numRoutines int) (countValue int) {
    var wg sync.WaitGroup

    c := make(chan int)

    k := func(i int) {
        defer wg.Done()
        c <- 5
    }
    for i := 0; i < numShards; i++ {
        wg.Add(1)
        go k(i)
    }

    go func() {
        for i := range c {
            countValue += i
        }
    }()

    wg.Wait()
    return
}

仍然是比赛计数:/

有什么更好的办法吗?

绝对有更好的方法来安全地增加变量:使用 sync/atomic:

import "sync/atomic"

var words int64
k := func() {
    _ = atomic.AddInt64(&words, 5) // increment atomically
}

使用通道基本上消除了对互斥体的需要,或者消除了并发访问变量本身的风险,这里的等待组有点矫枉过正

频道:

words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
    read := 0
    for i := range ch {
        words += 5 // or use i or something
        read++
        if read == numRoutines {
            break // we've received data from all routines
        }
    }
    close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i++ {
    ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)

如您所见,numRoutines不再是例程数,而是通道上的写入数。您可以将其移动到单独的例程中,仍然:

for i := 0; i < numRoutines; i++ {
    go func(ch chan<- int, i int) {
        // do stuff here
        ch <- 5 * i // for example
    }(ch, i)
}

等待组:

您可以使用等待组 + 原子来代替使用可以取消的上下文或通道来获得相同的结果。 IMO 最简单的方法是创建一个类型:

type counter struct {
    words int64
}

func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    _ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}

func main () {
    cnt := counter{}
    wg := sync.WaitGroup{}
    wg.Add(numRoutines) // create the waitgroup
    for i := 0; i < numRoutines; i++ {
        go cnt.doStuff(&wg, i)
    }
    wg.Wait() // wait for all routines to finish
    fmt.Println("Counted %d\n", cnt.words)
}

修复你的第三个解决方案

正如我在评论中提到的:您的第三个解决方案仍然导致竞争条件,因为通道 c 从未关闭,这意味着例程:

go func () {
    for i := range c {
        countValue += i
    }
}()

从不 return。等待组还只确保您已在通道上发送所有值,但不会确保 countValue 已递增到其最终值。解决方法是在 wg.Wait() returns 之后关闭通道,以便例程可以 return,并添加一个 done 通道,您可以在最后一个例程 returns,并在returning.

之前添加一个<-done语句
func count(numRoutines int) (countValue int) {
    var wg sync.WaitGroup

    c := make(chan int)

    k := func(i int) {
        defer wg.Done()
        c <- 5
    }
    for i := 0; i < numShards; i++ {
        wg.Add(1)
        go k(i)
    }

    done := make(chan struct{})
    go func() {
        for i := range c {
            countValue += i
        }
        close(done)
    }()

    wg.Wait()
    close(c)
    <-done
    return
}

不过,这增加了一些混乱,IMO 有点混乱。将 wg.Wait() 调用移动到例程可能更容易:

func count(numRoutines int) (countValue int) {
    var wg sync.WaitGroup

    c := make(chan int)

    // add wg as argument, makes it easier to move this function outside of this scope
    k := func(wg *sync.WaitGroup, i int) {
        defer wg.Done()
        c <- 5
    }
    wg.Add(numShards) // increment the waitgroup once
    for i := 0; i < numShards; i++ {
        go k(&wg, i)
    }

    go func() {
        wg.Wait()
        close(c) // this ends the loop over the channel
    }()
    // just iterate over the channel until it is closed
    for i := range c {
       countValue += i
    }
    // we've added all values to countValue
    return
}