来自函数的递归调用以 goroutine 和惯用方式开始,以在所有 worker goroutine 完成时继续调用者
Recursive calls from function started as goroutine & Idiomatic way to continue caller when all worker goroutines finished
我正在使用 goroutines 实现一个(某种)组合回溯算法。我的问题可以表示为具有特定 degree/spread 的树,我想在其中访问每片叶子并根据所采用的路径计算结果。在给定的级别上,我想生成 goroutines 以同时处理子问题,即如果我有一个度数为 3 的树并且我想在级别 2 之后开始并发,我将生成 3*3=9 goroutines 继续处理子问题并发。
func main() {
cRes := make(chan string, 100)
res := []string{}
numLevels := 5
spread := 3
startConcurrencyAtLevel := 2
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes)
for {
select {
case r := <-cRes:
res = append(res, r)
case <-time.After(10 * time.Second):
fmt.Println("Caculation timed out")
fmt.Println(len(res), math.Pow(float64(spread), float64(numLevels)))
return
}
}
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string) {
if len(path) == maxLevels {
// some longer running task here associated with the found path, also using a lookup table
// real problem actually returns not the path but the result if it satisfies some condition
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
}
}
}
上面的代码有效,但是我依赖于 for select 语句超时。我正在寻找一种在所有 goroutine 完成后立即继续 main() 的方法,即所有子问题都已处理。
我已经想到了两个可能的 (unpreferred/unelegant) 解决方案:
使用互斥锁保护的结果映射 + 等待组而不是基于通道的方法应该可以解决问题,但我很好奇是否有一个简洁的通道解决方案。
使用退出通道(int 类型)。每次生成一个 goroutine 时,退出通道都会得到一个 +1 int,每次在叶子中完成计算时,它都会得到一个 -1 int 并且调用者对这些值求和。请参阅以下代码片段,但这不是一个好的解决方案,因为它(相当明显地)遇到了我不想处理的时间问题。例如,如果第一个 goroutine 在另一个 goroutine 生成之前完成,它会过早退出。
for {
select {
case q := <-cRunningRoutines:
runningRoutines += q
if runningRoutines == 0 {
fmt.Println("Calculation complete")
return res
}
// ...same cases as above
}
游乐场:https://go.dev/play/p/9jzeCvl8Clj
以下问题:
- 从作为 goroutine 启动的函数对其自身进行递归调用是否是一种有效的方法?
- 从
cRes
读取结果直到所有派生的 goroutine 完成的惯用方式是什么?我在某处读到,计算完成后通道应该关闭,但我只是想不通在这种情况下如何集成它。
对任何想法都很满意,谢谢!
正如 torek 所提到的,我在 waitgroup 完成等待后剥离了一个关闭通道的匿名函数。还需要一些逻辑来调用生成的 goroutines 的 wg.Done()
在 goroutine 生成级别 returns.
的递归之后
总的来说,我认为这是一个有用的成语(如果我错了请纠正我:))
游乐场:https://go.dev/play/p/bQjHENsZL25
func main() {
cRes := make(chan string, 100)
numLevels := 3
spread := 3
startConcurrencyAtLevel := 2
var wg sync.WaitGroup
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)
go func() {
// time.Sleep(1 * time.Second) // edit: code should work without this initial sleep
wg.Wait()
close(cRes)
}()
for r := range cRes {
fmt.Println(r)
}
fmt.Println("Done!")
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
if len(path) == maxLevels {
// some longer running task here associated with the found path
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
}
}
}
阅读描述和片段我无法准确理解您想要实现的目标,但我有一些我每天使用的频道的提示和模式,我认为它们很有帮助。
context
包对于以安全的方式管理 goroutines 的状态非常有帮助。在您的示例中,time.After
用于结束主程序,但在 non-main 函数中它可能会泄漏 goroutines:如果您改为使用 context.Context
并将其传递到 gorotuines(通常通过函数的第一个参数),您将能够控制下游调用的取消。 This explains it briefly.
通常的做法是在生成消息并在通道中发送消息的函数中创建通道(以及 return 它们)。相同的功能应该负责关闭通道,例如,在完成写入时使用 defer close(channel)
。
这很方便,因为当通道被缓冲时,即使它仍然有数据也可以关闭它:Go 运行时实际上会等到所有消息都被轮询后再关闭。对于无缓冲通道,该函数将无法通过通道发送消息,直到通道的 reader 准备好轮询它,因此无法退出。
This is an example (without recursion)。
在此示例中,我们可以 close
缓冲或未缓冲的通道,因为发送将阻塞,直到主 goroutine 中的通道上的 for := range
从中读取。
This is a variant 同样的原理,通道作为参数传递。
我们可以使用 sync.WaitGroup
串联 与通道,为单个 goroutine 发出完成信号,并让“编排”goroutine 知道通道可以关闭,因为所有消息生产者都已将数据发送到通道中。与第 1 点相同的注意事项适用于 close
操作。
This is an example 显示 waitGroup 的使用和通道的外部关闭器。
频道可以有方向!请注意,在示例中,我在传递 in/outside 函数时 added/removed 通道旁边的箭头(例如 <-chan string
或 chan<- string
)。这告诉编译器,一个通道在该函数的范围内分别只读或写。
这在两个方面有所帮助:
- 编译器将生成更高效的代码,因为有方向的通道将有一个锁而不是 2 个。
- 函数的签名描述了它是否只使用通道写入(可能
close()
)或读取:记住从带有 range
的通道读取会自动停止通道关闭时的迭代。
您可以构建通道的通道:make(chan chan string)
是构建处理管道的有效(有用)结构。
它的一个常见用法是 fan-in goroutine 收集一系列 channel-producing goroutines 的多个输出。
This is an example 如何使用它们。
本质上,回答您最初的问题:
Is doing recursive calls from a function started as a goroutine to itself a valid approach?
如果你真的需要递归,最好将它与并发代码分开处理:创建一个专用函数递归地将数据发送到通道,并协调关闭来电者的频道。
What would be an idiomatic way of reading the results from cRes until all spawned goroutines finish? I read somewhere that channels should be closed when computation is done, but I just cant wrap my head around how to integrate it in this case.
一个很好的参考是 Go Concurrency Patterns: Pipelines and cancellation: this is a rather old post (before the context
package existedin the std lib) and I think Parallel digestion
是您正在寻找的解决原始问题的参考。
我正在使用 goroutines 实现一个(某种)组合回溯算法。我的问题可以表示为具有特定 degree/spread 的树,我想在其中访问每片叶子并根据所采用的路径计算结果。在给定的级别上,我想生成 goroutines 以同时处理子问题,即如果我有一个度数为 3 的树并且我想在级别 2 之后开始并发,我将生成 3*3=9 goroutines 继续处理子问题并发。
func main() {
cRes := make(chan string, 100)
res := []string{}
numLevels := 5
spread := 3
startConcurrencyAtLevel := 2
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes)
for {
select {
case r := <-cRes:
res = append(res, r)
case <-time.After(10 * time.Second):
fmt.Println("Caculation timed out")
fmt.Println(len(res), math.Pow(float64(spread), float64(numLevels)))
return
}
}
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string) {
if len(path) == maxLevels {
// some longer running task here associated with the found path, also using a lookup table
// real problem actually returns not the path but the result if it satisfies some condition
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes)
}
}
}
上面的代码有效,但是我依赖于 for select 语句超时。我正在寻找一种在所有 goroutine 完成后立即继续 main() 的方法,即所有子问题都已处理。
我已经想到了两个可能的 (unpreferred/unelegant) 解决方案:
使用互斥锁保护的结果映射 + 等待组而不是基于通道的方法应该可以解决问题,但我很好奇是否有一个简洁的通道解决方案。
使用退出通道(int 类型)。每次生成一个 goroutine 时,退出通道都会得到一个 +1 int,每次在叶子中完成计算时,它都会得到一个 -1 int 并且调用者对这些值求和。请参阅以下代码片段,但这不是一个好的解决方案,因为它(相当明显地)遇到了我不想处理的时间问题。例如,如果第一个 goroutine 在另一个 goroutine 生成之前完成,它会过早退出。
for {
select {
case q := <-cRunningRoutines:
runningRoutines += q
if runningRoutines == 0 {
fmt.Println("Calculation complete")
return res
}
// ...same cases as above
}
游乐场:https://go.dev/play/p/9jzeCvl8Clj
以下问题:
- 从作为 goroutine 启动的函数对其自身进行递归调用是否是一种有效的方法?
- 从
cRes
读取结果直到所有派生的 goroutine 完成的惯用方式是什么?我在某处读到,计算完成后通道应该关闭,但我只是想不通在这种情况下如何集成它。
对任何想法都很满意,谢谢!
正如 torek 所提到的,我在 waitgroup 完成等待后剥离了一个关闭通道的匿名函数。还需要一些逻辑来调用生成的 goroutines 的 wg.Done()
在 goroutine 生成级别 returns.
总的来说,我认为这是一个有用的成语(如果我错了请纠正我:))
游乐场:https://go.dev/play/p/bQjHENsZL25
func main() {
cRes := make(chan string, 100)
numLevels := 3
spread := 3
startConcurrencyAtLevel := 2
var wg sync.WaitGroup
nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)
go func() {
// time.Sleep(1 * time.Second) // edit: code should work without this initial sleep
wg.Wait()
close(cRes)
}()
for r := range cRes {
fmt.Println(r)
}
fmt.Println("Done!")
}
func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
if len(path) == maxLevels {
// some longer running task here associated with the found path
cRes <- path
return
}
for i := 1; i <= spread; i++ {
nextPath := path + fmt.Sprint(i)
if len(path) == startConcurrencyAtLevel {
go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
} else {
nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
}
}
}
阅读描述和片段我无法准确理解您想要实现的目标,但我有一些我每天使用的频道的提示和模式,我认为它们很有帮助。
context
包对于以安全的方式管理 goroutines 的状态非常有帮助。在您的示例中,time.After
用于结束主程序,但在 non-main 函数中它可能会泄漏 goroutines:如果您改为使用context.Context
并将其传递到 gorotuines(通常通过函数的第一个参数),您将能够控制下游调用的取消。 This explains it briefly.通常的做法是在生成消息并在通道中发送消息的函数中创建通道(以及 return 它们)。相同的功能应该负责关闭通道,例如,在完成写入时使用
defer close(channel)
。 这很方便,因为当通道被缓冲时,即使它仍然有数据也可以关闭它:Go 运行时实际上会等到所有消息都被轮询后再关闭。对于无缓冲通道,该函数将无法通过通道发送消息,直到通道的 reader 准备好轮询它,因此无法退出。 This is an example (without recursion)。 在此示例中,我们可以close
缓冲或未缓冲的通道,因为发送将阻塞,直到主 goroutine 中的通道上的for := range
从中读取。 This is a variant 同样的原理,通道作为参数传递。我们可以使用
sync.WaitGroup
串联 与通道,为单个 goroutine 发出完成信号,并让“编排”goroutine 知道通道可以关闭,因为所有消息生产者都已将数据发送到通道中。与第 1 点相同的注意事项适用于close
操作。 This is an example 显示 waitGroup 的使用和通道的外部关闭器。频道可以有方向!请注意,在示例中,我在传递 in/outside 函数时 added/removed 通道旁边的箭头(例如
<-chan string
或chan<- string
)。这告诉编译器,一个通道在该函数的范围内分别只读或写。 这在两个方面有所帮助:- 编译器将生成更高效的代码,因为有方向的通道将有一个锁而不是 2 个。
- 函数的签名描述了它是否只使用通道写入(可能
close()
)或读取:记住从带有range
的通道读取会自动停止通道关闭时的迭代。
您可以构建通道的通道:
make(chan chan string)
是构建处理管道的有效(有用)结构。 它的一个常见用法是 fan-in goroutine 收集一系列 channel-producing goroutines 的多个输出。 This is an example 如何使用它们。
本质上,回答您最初的问题:
Is doing recursive calls from a function started as a goroutine to itself a valid approach?
如果你真的需要递归,最好将它与并发代码分开处理:创建一个专用函数递归地将数据发送到通道,并协调关闭来电者的频道。
What would be an idiomatic way of reading the results from cRes until all spawned goroutines finish? I read somewhere that channels should be closed when computation is done, but I just cant wrap my head around how to integrate it in this case.
一个很好的参考是 Go Concurrency Patterns: Pipelines and cancellation: this is a rather old post (before the context
package existedin the std lib) and I think Parallel digestion
是您正在寻找的解决原始问题的参考。