如何在处理结果时正确关闭 Goroutines 中的共享通道
How do I correctly close shared channels in Goroutines while processing results
我正在尝试找到使用 worker go 例程生成的结果的正确方法,同时在所有工作完成后优雅地退出结果循环。为了说明,我制作了以下示例。我的实际情况与这个示例略有不同,因为我不知道每个工作人员执行的例行程序 "work" 会 return 多少,显然这些 for 循环正在执行固定数量的结果 (5) .
我是 goroutines 和 channels 的新手,但以下是我理解的基本租户;
- 只有发送者才能关闭频道
- 在通道上执行
range
将继续,直到通道关闭
package main
import (
"fmt"
"sync"
)
func worker1(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("1.%d", i)
}
wg.Done()
}
func worker2(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("2.%d", i)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
r := make(chan string)
wg.Add(2)
go worker1(r, &wg)
go worker2(r, &wg)
for i := range r {
fmt.Printf("Got job result: %s\n", i)
}
wg.Wait()
}
这个例子是死锁的,因为范围循环永远不会退出,因为通道永远不会关闭。工作完成后,我可以在通道上执行关闭操作(即将 wg.Done()
替换为 close(r)
),但是当其他 worker goroutine 尝试将更多结果发送到已经关闭的通道时,我会感到恐慌.
最后我想我可以将 wg.Wait()
移动到结果循环之上,完成后关闭通道,然后开始打印结果,但这意味着我无法打印任何结果,直到所有工作都完成已在所有线程上完成。
在所有工作线程完成后优雅地退出结果循环,同时又不等到所有工作完成后再开始打印结果的正确方法是什么?
我已经编辑了您的代码,使其在没有死锁的情况下工作。问题是通道上的接收阻塞了主线程,你的两个 goroutines
都没有发送更多数据。
此解决方案运行一个新的 goroutine
,它会在 WaitGroup
完成后关闭结果通道。
package main
import (
"fmt"
"sync"
)
func worker1(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("1.%d", i)
}
wg.Done()
}
func worker2(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("2.%d", i)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
r := make(chan string)
wg.Add(2)
go worker1(r, &wg)
go worker2(r, &wg)
go func() {
defer close(r)
wg.Wait()
}()
for i := range r {
fmt.Printf("Got job result: %s\n", i)
}
}
我正在尝试找到使用 worker go 例程生成的结果的正确方法,同时在所有工作完成后优雅地退出结果循环。为了说明,我制作了以下示例。我的实际情况与这个示例略有不同,因为我不知道每个工作人员执行的例行程序 "work" 会 return 多少,显然这些 for 循环正在执行固定数量的结果 (5) .
我是 goroutines 和 channels 的新手,但以下是我理解的基本租户;
- 只有发送者才能关闭频道
- 在通道上执行
range
将继续,直到通道关闭
package main
import (
"fmt"
"sync"
)
func worker1(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("1.%d", i)
}
wg.Done()
}
func worker2(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("2.%d", i)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
r := make(chan string)
wg.Add(2)
go worker1(r, &wg)
go worker2(r, &wg)
for i := range r {
fmt.Printf("Got job result: %s\n", i)
}
wg.Wait()
}
这个例子是死锁的,因为范围循环永远不会退出,因为通道永远不会关闭。工作完成后,我可以在通道上执行关闭操作(即将 wg.Done()
替换为 close(r)
),但是当其他 worker goroutine 尝试将更多结果发送到已经关闭的通道时,我会感到恐慌.
最后我想我可以将 wg.Wait()
移动到结果循环之上,完成后关闭通道,然后开始打印结果,但这意味着我无法打印任何结果,直到所有工作都完成已在所有线程上完成。
在所有工作线程完成后优雅地退出结果循环,同时又不等到所有工作完成后再开始打印结果的正确方法是什么?
我已经编辑了您的代码,使其在没有死锁的情况下工作。问题是通道上的接收阻塞了主线程,你的两个 goroutines
都没有发送更多数据。
此解决方案运行一个新的 goroutine
,它会在 WaitGroup
完成后关闭结果通道。
package main
import (
"fmt"
"sync"
)
func worker1(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("1.%d", i)
}
wg.Done()
}
func worker2(r chan string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
r <- fmt.Sprintf("2.%d", i)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
r := make(chan string)
wg.Add(2)
go worker1(r, &wg)
go worker2(r, &wg)
go func() {
defer close(r)
wg.Wait()
}()
for i := range r {
fmt.Printf("Got job result: %s\n", i)
}
}