X 个 goroutines 来更新同一个变量
X number of goroutines to update the same variable
我想让 X 数量的 goroutines 使用并行更新 CountValue
(numRoutines 与你有多少 CPU 一样多)。
解决方案 1:
func count(numRoutines int) (countValue int) {
var mu sync.Mutex
k := func(i int) {
mu.Lock()
defer mu.Unlock()
countValue += 5
}
for i := 0; i < numRoutines; i++ {
go k(i)
}
它变成了数据竞争,返回的countValue = 0
。
解决方案 2:
func count(numRoutines int) (countValue int) {
k := func(i int, c chan int) {
c <- 5
}
c := make(chan int)
for i := 0; i < numRoutines; i++ {
go k(i, c)
}
for i := 0; i < numRoutines; i++ {
countValue += <- c
}
return
}
我对它进行了基准测试,顺序加法比使用 goroutines 更快。我认为这是因为我在这里有两个 for 循环,因为当我将 countValue += <- c
放在第一个 for 循环中时,代码运行得更快。
解决方案 3:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
go func() {
for i := range c {
countValue += i
}
}()
wg.Wait()
return
}
仍然是比赛计数:/
有什么更好的办法吗?
绝对有更好的方法来安全地增加变量:使用 sync/atomic
:
import "sync/atomic"
var words int64
k := func() {
_ = atomic.AddInt64(&words, 5) // increment atomically
}
使用通道基本上消除了对互斥体的需要,或者消除了并发访问变量本身的风险,这里的等待组有点矫枉过正
频道:
words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
read := 0
for i := range ch {
words += 5 // or use i or something
read++
if read == numRoutines {
break // we've received data from all routines
}
}
close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i++ {
ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)
如您所见,numRoutines
不再是例程数,而是通道上的写入数。您可以将其移动到单独的例程中,仍然:
for i := 0; i < numRoutines; i++ {
go func(ch chan<- int, i int) {
// do stuff here
ch <- 5 * i // for example
}(ch, i)
}
等待组:
您可以使用等待组 + 原子来代替使用可以取消的上下文或通道来获得相同的结果。 IMO 最简单的方法是创建一个类型:
type counter struct {
words int64
}
func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
defer wg.Done()
_ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}
func main () {
cnt := counter{}
wg := sync.WaitGroup{}
wg.Add(numRoutines) // create the waitgroup
for i := 0; i < numRoutines; i++ {
go cnt.doStuff(&wg, i)
}
wg.Wait() // wait for all routines to finish
fmt.Println("Counted %d\n", cnt.words)
}
修复你的第三个解决方案
正如我在评论中提到的:您的第三个解决方案仍然导致竞争条件,因为通道 c
从未关闭,这意味着例程:
go func () {
for i := range c {
countValue += i
}
}()
从不 return。等待组还只确保您已在通道上发送所有值,但不会确保 countValue
已递增到其最终值。解决方法是在 wg.Wait()
returns 之后关闭通道,以便例程可以 return,并添加一个 done
通道,您可以在最后一个例程 returns,并在returning.
之前添加一个<-done
语句
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
done := make(chan struct{})
go func() {
for i := range c {
countValue += i
}
close(done)
}()
wg.Wait()
close(c)
<-done
return
}
不过,这增加了一些混乱,IMO 有点混乱。将 wg.Wait()
调用移动到例程可能更容易:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
// add wg as argument, makes it easier to move this function outside of this scope
k := func(wg *sync.WaitGroup, i int) {
defer wg.Done()
c <- 5
}
wg.Add(numShards) // increment the waitgroup once
for i := 0; i < numShards; i++ {
go k(&wg, i)
}
go func() {
wg.Wait()
close(c) // this ends the loop over the channel
}()
// just iterate over the channel until it is closed
for i := range c {
countValue += i
}
// we've added all values to countValue
return
}
我想让 X 数量的 goroutines 使用并行更新 CountValue
(numRoutines 与你有多少 CPU 一样多)。
解决方案 1:
func count(numRoutines int) (countValue int) {
var mu sync.Mutex
k := func(i int) {
mu.Lock()
defer mu.Unlock()
countValue += 5
}
for i := 0; i < numRoutines; i++ {
go k(i)
}
它变成了数据竞争,返回的countValue = 0
。
解决方案 2:
func count(numRoutines int) (countValue int) {
k := func(i int, c chan int) {
c <- 5
}
c := make(chan int)
for i := 0; i < numRoutines; i++ {
go k(i, c)
}
for i := 0; i < numRoutines; i++ {
countValue += <- c
}
return
}
我对它进行了基准测试,顺序加法比使用 goroutines 更快。我认为这是因为我在这里有两个 for 循环,因为当我将 countValue += <- c
放在第一个 for 循环中时,代码运行得更快。
解决方案 3:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
go func() {
for i := range c {
countValue += i
}
}()
wg.Wait()
return
}
仍然是比赛计数:/
有什么更好的办法吗?
绝对有更好的方法来安全地增加变量:使用 sync/atomic
:
import "sync/atomic"
var words int64
k := func() {
_ = atomic.AddInt64(&words, 5) // increment atomically
}
使用通道基本上消除了对互斥体的需要,或者消除了并发访问变量本身的风险,这里的等待组有点矫枉过正
频道:
words := 0
done := make(chan struct{}) // or use context
ch := make(chan int, numRoutines) // buffer so each routine can write
go func () {
read := 0
for i := range ch {
words += 5 // or use i or something
read++
if read == numRoutines {
break // we've received data from all routines
}
}
close(done) // indicate this routine has terminated
}()
for i := 0; i < numRoutines; i++ {
ch <- i // write whatever value needs to be used in the counting routine on the channel
}
<- done // wait for our routine that increments words to return
close(ch) // this channel is no longer needed
fmt.Printf("Counted %d\n", words)
如您所见,numRoutines
不再是例程数,而是通道上的写入数。您可以将其移动到单独的例程中,仍然:
for i := 0; i < numRoutines; i++ {
go func(ch chan<- int, i int) {
// do stuff here
ch <- 5 * i // for example
}(ch, i)
}
等待组:
您可以使用等待组 + 原子来代替使用可以取消的上下文或通道来获得相同的结果。 IMO 最简单的方法是创建一个类型:
type counter struct {
words int64
}
func (c *counter) doStuff(wg *sync.WaitGroup, i int) {
defer wg.Done()
_ = atomic.AddInt64(&c.words, i * 5) // whatever value you need to add
}
func main () {
cnt := counter{}
wg := sync.WaitGroup{}
wg.Add(numRoutines) // create the waitgroup
for i := 0; i < numRoutines; i++ {
go cnt.doStuff(&wg, i)
}
wg.Wait() // wait for all routines to finish
fmt.Println("Counted %d\n", cnt.words)
}
修复你的第三个解决方案
正如我在评论中提到的:您的第三个解决方案仍然导致竞争条件,因为通道 c
从未关闭,这意味着例程:
go func () {
for i := range c {
countValue += i
}
}()
从不 return。等待组还只确保您已在通道上发送所有值,但不会确保 countValue
已递增到其最终值。解决方法是在 wg.Wait()
returns 之后关闭通道,以便例程可以 return,并添加一个 done
通道,您可以在最后一个例程 returns,并在returning.
<-done
语句
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
k := func(i int) {
defer wg.Done()
c <- 5
}
for i := 0; i < numShards; i++ {
wg.Add(1)
go k(i)
}
done := make(chan struct{})
go func() {
for i := range c {
countValue += i
}
close(done)
}()
wg.Wait()
close(c)
<-done
return
}
不过,这增加了一些混乱,IMO 有点混乱。将 wg.Wait()
调用移动到例程可能更容易:
func count(numRoutines int) (countValue int) {
var wg sync.WaitGroup
c := make(chan int)
// add wg as argument, makes it easier to move this function outside of this scope
k := func(wg *sync.WaitGroup, i int) {
defer wg.Done()
c <- 5
}
wg.Add(numShards) // increment the waitgroup once
for i := 0; i < numShards; i++ {
go k(&wg, i)
}
go func() {
wg.Wait()
close(c) // this ends the loop over the channel
}()
// just iterate over the channel until it is closed
for i := range c {
countValue += i
}
// we've added all values to countValue
return
}