在取消上下文之前无法同时使用 goroutines 来查找最大值

Unable to use goroutines concurrently to find max until context is cancelled

我已经成功地对 compute 次调用中的 findMax 进行了没有 goroutines 的同步解决方案。

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

func findMax(ctx context.Context, concurrency int) uint64 {
    var (
        max uint64 = 0
        num uint64 = 0
    )

    for i := 0; i < concurrency; i++ {
        num = compute()

        if num > max {
            max = num
        }
    }

    return max
}

func compute() uint64 {
    // NOTE: This is a MOCK implementation of the blocking operation.
    
    time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    max := findMax(ctx, concurrency)
    fmt.Println(max)
}


https://play.golang.org/p/lYXRNTDtNCI

当我尝试使用 goroutines 来使用 findMax 重复调用 compute 函数时使用尽可能多的 goroutines 直到上下文 ctx 被调用者 main 函数取消。我每次都得到 0,而不是灌浆计算函数调用的预期最大值。我尝试了不同的方法来做到这一点,但大多数时候都会陷入僵局。

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

func findMax(ctx context.Context, concurrency int) uint64 {
    var (
        max uint64 = 0
        num uint64 = 0
    )

    for i := 0; i < concurrency; i++ {
        select {
        case <- ctx.Done():
            return max
        default:
            go func() {
                num = compute()
                if num > max {
                    max = num
                }
            }()
        }
    }

    return max
}

func compute() uint64 {
    // NOTE: This is a MOCK implementation of the blocking operation.
    
    time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    max := findMax(ctx, concurrency)
    fmt.Println(max)
}

https://play.golang.org/p/3fFFq2xlXAE

您的程序有多个问题:

  1. 您正在生成多个在共享变量上运行的 goroutine,即 maxnum 导致数据竞争,因为它们不受保护(例如,通过 Mutex)。
  2. 这里 num 被每个 worker goroutine 修改,但它应该是 worker 本地的,否则计算的数据可能会丢失(例如,一个 worker goroutine 计算出一个结果并将其存储在 num 中,但是对之后,第二个工作人员计算并替换 num 的值)。
 num = compute // Should be "num := compute"
  1. 您没有等待每个 goroutine 完成它的计算,这可能会导致不正确的结果,因为即使没有取消上下文,也没有考虑每个 worker 的计算。使用 sync.WaitGroup 或渠道解决此问题。

这是一个示例程序,可以解决您代码中的大部分问题:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type result struct {
    sync.RWMutex
    max uint64
}

func findMax(ctx context.Context, workers int) uint64 {
    var (
        res = result{}
        wg  = sync.WaitGroup{}
    )

    for i := 0; i < workers; i++ {
        select {
        case <-ctx.Done():
            // RLock to read res.max
            res.RLock()
            ret := res.max
            res.RUnlock()
            return ret
        default:
            wg.Add(1)
            go func() {
                defer wg.Done()
                num := compute()

                // Lock so that read from res.max and write
                // to res.max is safe. Else, data race could
                // occur.
                res.Lock()
                if num > res.max {
                    res.max = num
                }
                res.Unlock()
            }()
        }
    }

    // Wait for all the goroutine to finish work i.e., all
    // workers are done computing and updating the max.
    wg.Wait()

    return res.max
}

func compute() uint64 {
    rnd := rand.Int63n(100)
    time.Sleep(time.Duration(rnd) * time.Millisecond)
    return rand.Uint64()
}

func main() {
    maxDuration := 2 * time.Second
    concurrency := 10

    ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
    defer cancel()

    fmt.Println(findMax(ctx, concurrency))
}

正如@Brits 在评论中指出的那样,当上下文被取消时,请确保停止那些工作 goroutine 以停止处理(如果可能),因为它不再需要了。