在取消上下文之前无法同时使用 goroutines 来查找最大值
Unable to use goroutines concurrently to find max until context is cancelled
我已经成功地对 compute
次调用中的 findMax
进行了没有 goroutines 的同步解决方案。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
num = compute()
if num > max {
max = num
}
}
return max
}
func compute() uint64 {
// NOTE: This is a MOCK implementation of the blocking operation.
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
https://play.golang.org/p/lYXRNTDtNCI
当我尝试使用 goroutines 来使用 findMax
重复调用 compute
函数时使用尽可能多的 goroutines 直到上下文 ctx
被调用者 main
函数取消。我每次都得到 0,而不是灌浆计算函数调用的预期最大值。我尝试了不同的方法来做到这一点,但大多数时候都会陷入僵局。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
select {
case <- ctx.Done():
return max
default:
go func() {
num = compute()
if num > max {
max = num
}
}()
}
}
return max
}
func compute() uint64 {
// NOTE: This is a MOCK implementation of the blocking operation.
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
您的程序有多个问题:
- 您正在生成多个在共享变量上运行的 goroutine,即
max
和 num
导致数据竞争,因为它们不受保护(例如,通过 Mutex)。
- 这里
num
被每个 worker goroutine 修改,但它应该是 worker 本地的,否则计算的数据可能会丢失(例如,一个 worker goroutine 计算出一个结果并将其存储在 num 中,但是对之后,第二个工作人员计算并替换 num 的值)。
num = compute // Should be "num := compute"
- 您没有等待每个 goroutine 完成它的计算,这可能会导致不正确的结果,因为即使没有取消上下文,也没有考虑每个 worker 的计算。使用
sync.WaitGroup
或渠道解决此问题。
这是一个示例程序,可以解决您代码中的大部分问题:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type result struct {
sync.RWMutex
max uint64
}
func findMax(ctx context.Context, workers int) uint64 {
var (
res = result{}
wg = sync.WaitGroup{}
)
for i := 0; i < workers; i++ {
select {
case <-ctx.Done():
// RLock to read res.max
res.RLock()
ret := res.max
res.RUnlock()
return ret
default:
wg.Add(1)
go func() {
defer wg.Done()
num := compute()
// Lock so that read from res.max and write
// to res.max is safe. Else, data race could
// occur.
res.Lock()
if num > res.max {
res.max = num
}
res.Unlock()
}()
}
}
// Wait for all the goroutine to finish work i.e., all
// workers are done computing and updating the max.
wg.Wait()
return res.max
}
func compute() uint64 {
rnd := rand.Int63n(100)
time.Sleep(time.Duration(rnd) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
fmt.Println(findMax(ctx, concurrency))
}
正如@Brits 在评论中指出的那样,当上下文被取消时,请确保停止那些工作 goroutine 以停止处理(如果可能),因为它不再需要了。
我已经成功地对 compute
次调用中的 findMax
进行了没有 goroutines 的同步解决方案。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
num = compute()
if num > max {
max = num
}
}
return max
}
func compute() uint64 {
// NOTE: This is a MOCK implementation of the blocking operation.
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
https://play.golang.org/p/lYXRNTDtNCI
当我尝试使用 goroutines 来使用 findMax
重复调用 compute
函数时使用尽可能多的 goroutines 直到上下文 ctx
被调用者 main
函数取消。我每次都得到 0,而不是灌浆计算函数调用的预期最大值。我尝试了不同的方法来做到这一点,但大多数时候都会陷入僵局。
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func findMax(ctx context.Context, concurrency int) uint64 {
var (
max uint64 = 0
num uint64 = 0
)
for i := 0; i < concurrency; i++ {
select {
case <- ctx.Done():
return max
default:
go func() {
num = compute()
if num > max {
max = num
}
}()
}
}
return max
}
func compute() uint64 {
// NOTE: This is a MOCK implementation of the blocking operation.
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
max := findMax(ctx, concurrency)
fmt.Println(max)
}
您的程序有多个问题:
- 您正在生成多个在共享变量上运行的 goroutine,即
max
和num
导致数据竞争,因为它们不受保护(例如,通过 Mutex)。 - 这里
num
被每个 worker goroutine 修改,但它应该是 worker 本地的,否则计算的数据可能会丢失(例如,一个 worker goroutine 计算出一个结果并将其存储在 num 中,但是对之后,第二个工作人员计算并替换 num 的值)。
num = compute // Should be "num := compute"
- 您没有等待每个 goroutine 完成它的计算,这可能会导致不正确的结果,因为即使没有取消上下文,也没有考虑每个 worker 的计算。使用
sync.WaitGroup
或渠道解决此问题。
这是一个示例程序,可以解决您代码中的大部分问题:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type result struct {
sync.RWMutex
max uint64
}
func findMax(ctx context.Context, workers int) uint64 {
var (
res = result{}
wg = sync.WaitGroup{}
)
for i := 0; i < workers; i++ {
select {
case <-ctx.Done():
// RLock to read res.max
res.RLock()
ret := res.max
res.RUnlock()
return ret
default:
wg.Add(1)
go func() {
defer wg.Done()
num := compute()
// Lock so that read from res.max and write
// to res.max is safe. Else, data race could
// occur.
res.Lock()
if num > res.max {
res.max = num
}
res.Unlock()
}()
}
}
// Wait for all the goroutine to finish work i.e., all
// workers are done computing and updating the max.
wg.Wait()
return res.max
}
func compute() uint64 {
rnd := rand.Int63n(100)
time.Sleep(time.Duration(rnd) * time.Millisecond)
return rand.Uint64()
}
func main() {
maxDuration := 2 * time.Second
concurrency := 10
ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
defer cancel()
fmt.Println(findMax(ctx, concurrency))
}
正如@Brits 在评论中指出的那样,当上下文被取消时,请确保停止那些工作 goroutine 以停止处理(如果可能),因为它不再需要了。