来自函数的递归调用以 goroutine 和惯用方式开始,以在所有 worker goroutine 完成时继续调用者

Recursive calls from function started as goroutine & Idiomatic way to continue caller when all worker goroutines finished

我正在使用 goroutines 实现一个(某种)组合回溯算法。我的问题可以表示为具有特定 degree/spread 的树,我想在其中访问每片叶子并根据所采用的路径计算结果。在给定的级别上,我想生成 goroutines 以同时处理子问题,即如果我有一个度数为 3 的树并且我想在级别 2 之后开始并发,我将生成 3*3=9 goroutines 继续处理子问题并发。

func main() {
    cRes := make(chan string, 100)
    res := []string{}
    numLevels := 5
    spread := 3
    startConcurrencyAtLevel := 2
    nTree("", numLevels, spread, startConcurrencyAtLevel, cRes)
    for {
        select {
        case r := <-cRes:
            res = append(res, r)
        case <-time.After(10 * time.Second):
            fmt.Println("Caculation timed out")
            fmt.Println(len(res), math.Pow(float64(spread), float64(numLevels)))
            return
        }
    }
}

func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string) {
    if len(path) == maxLevels {
        // some longer running task here associated with the found path, also using a lookup table
        // real problem actually returns not the path but the result if it satisfies some condition
        cRes <- path
        return
    }
    for i := 1; i <= spread; i++ {
        nextPath := path + fmt.Sprint(i)
        if len(path) == startConcurrencyAtLevel {
            go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
        } else {
            nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
        }
    }
}

上面的代码有效,但是我依赖于 for select 语句超时。我正在寻找一种在所有 goroutine 完成后立即继续 main() 的方法,即所有子问题都已处理。

我已经想到了两个可能的 (unpreferred/unelegant) 解决方案:

    for {
        select {
        case q := <-cRunningRoutines:
            runningRoutines += q
            if runningRoutines == 0 {
                fmt.Println("Calculation complete")
                return res
            }
        // ...same cases as above
    }

游乐场:https://go.dev/play/p/9jzeCvl8Clj

以下问题:

  1. 从作为 goroutine 启动的函数对其自身进行递归调用是否是一种有效的方法?
  2. cRes 读取结果直到所有派生的 goroutine 完成的惯用方式是什么?我在某处读到,计算完成后通道应该关闭,但我只是想不通在这种情况下如何集成它。

对任何想法都很满意,谢谢!

正如 torek 所提到的,我在 waitgroup 完成等待后剥离了一个关闭通道的匿名函数。还需要一些逻辑来调用生成的 goroutines 的 wg.Done() 在 goroutine 生成级别 returns.

的递归之后

总的来说,我认为这是一个有用的成语(如果我错了请纠正我:))

游乐场:https://go.dev/play/p/bQjHENsZL25

func main() {
    cRes := make(chan string, 100)
    numLevels := 3
    spread := 3
    startConcurrencyAtLevel := 2
    var wg sync.WaitGroup
    nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)

    go func() {
        // time.Sleep(1 * time.Second) // edit: code should work without this initial sleep
        wg.Wait()
        close(cRes)
    }()

    for r := range cRes {
        fmt.Println(r)
    }

    fmt.Println("Done!")
}

func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
    if len(path) == maxLevels {
        // some longer running task here associated with the found path
        cRes <- path
        return
    }
    for i := 1; i <= spread; i++ {
        nextPath := path + fmt.Sprint(i)
        if len(path) == startConcurrencyAtLevel {
            go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
        } else {
            nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
        }
    }
}

阅读描述和片段我无法准确理解您想要实现的目标,但我有一些我每天使用的频道的提示和模式,我认为它们很有帮助。

  • context 包对于以安全的方式管理 goroutines 的状态非常有帮助。在您的示例中,time.After 用于结束主程序,但在 non-main 函数中它可能会泄漏 goroutines:如果您改为使用 context.Context 并将其传递到 gorotuines(通常通过函数的第一个参数),您将能够控制下游调用的取消。 This explains it briefly.

  • 通常的做法是在生成消息并在通道中发送消息的函数中创建通道(以及 return 它们)。相同的功能应该负责关闭通道,例如,在完成写入时使用 defer close(channel) 。 这很方便,因为当通道被缓冲时,即使它仍然有数据也可以关闭它:Go 运行时实际上会等到所有消息都被轮询后再关闭。对于无缓冲通道,该函数将无法通过通道发送消息,直到通道的 reader 准备好轮询它,因此无法退出。 This is an example (without recursion)。 在此示例中,我们可以 close 缓冲或未缓冲的通道,因为发送将阻塞,直到主 goroutine 中的通道上的 for := range 从中读取。 This is a variant 同样的原理,通道作为参数传递。

  • 我们可以使用 sync.WaitGroup 串联 与通道,为单个 goroutine 发出完成信号,并让“编排”goroutine 知道通道可以关闭,因为所有消息生产者都已将数据发送到通道中。与第 1 点相同的注意事项适用于 close 操作。 This is an example 显示 waitGroup 的使用和通道的外部关闭器。

  • 频道可以有方向!请注意,在示例中,我在传递 in/outside 函数时 added/removed 通道旁边的箭头(例如 <-chan stringchan<- string)。这告诉编译器,一个通道在该函数的范围内分别只读或写。 这在两个方面有所帮助:

    1. 编译器将生成更高效的代码,因为有方向的通道将有一个锁而不是 2 个。
    2. 函数的签名描述了它是否只使用通道写入(可能 close())或读取:记住从带有 range 的通道读取会自动停止通道关闭时的迭代。
  • 您可以构建通道的通道:make(chan chan string) 是构建处理管道的有效(有用)结构。 它的一个常见用法是 fan-in goroutine 收集一系列 channel-producing goroutines 的多个输出。 This is an example 如何使用它们。

本质上,回答您最初的问题:

Is doing recursive calls from a function started as a goroutine to itself a valid approach?

如果你真的需要递归,最好将它与并发代码分开处理:创建一个专用函数递归地将数据发送到通道,并协调关闭来电者的频道。

What would be an idiomatic way of reading the results from cRes until all spawned goroutines finish? I read somewhere that channels should be closed when computation is done, but I just cant wrap my head around how to integrate it in this case.

一个很好的参考是 Go Concurrency Patterns: Pipelines and cancellation: this is a rather old post (before the context package existedin the std lib) and I think Parallel digestion 是您正在寻找的解决原始问题的参考。