Golang worker 使用等待组
Golang worker using wait groups
我是 Golang 的新手,正在尝试了解 Golang 中 WaitGroups 和并发的工作原理。在此示例中,创建了 5 个工人,并使用一个通道将工作传递给每个工人。让工作人员休眠 1 秒钟以模拟繁重的计算。一切顺利,但程序并没有正常退出。而是打印此错误消息。请帮助理解为什么会这样。
fatal error: all goroutines are asleep - deadlock!
这是代码,
import (
"fmt"
"sync"
"time"
)
func worker(wg *sync.WaitGroup, messageChannel <-chan string) {
defer wg.Done()
for i := range messageChannel {
time.Sleep(time.Second)
fmt.Println("done processing - ", i)
}
}
func stop(wg *sync.WaitGroup) {
fmt.Println("waiting on the main thread")
wg.Wait()
}
func main() {
wg := new(sync.WaitGroup)
messageChannel := make(chan string, 50)
// create workers
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(wg, messageChannel)
}
// input some messages
for i := 0; i < 10; i++ {
messageChannel <- fmt.Sprint(i)
}
stop(wg)
close(messageChannel)
}
提前致谢!
为了扩展@Peter 的评论,这里是您编写的代码的执行流程:
- 初始化后,启动
worker
goroutines。每个 worker
goroutine 的范围将超过 messageChannel
,延迟 1 秒将打印出一条消息。
- 接下来,您通过 for 循环在
messageChannel
中插入一些消息。每个可用的 worker
goroutine 都会收到一条消息,直到所有消息都被处理并打印出来。之后,worker
goroutines 正在等待来自 messageChannel
. 的新消息
- 在
messageChannel
中插入消息的 for 循环完成后,执行 stop
函数,该函数阻塞 wg.Wait()
并等待所有 wg.Done()
在所有 worker
个协程中执行的调用。但是,由于 messageChannel
未关闭,因此 worker
个 goroutines 中的 none 个可以完成执行并执行 wg.Done()
个调用中的 none 个。
worker
goroutines 卡住了,因为 messageChannel
永远不会关闭,main
goroutine 卡住了,因为 wg.Wait()
在 stop
函数中调用,你最终会陷入僵局,所有 goroutines 都在睡觉。
根据建议,您只需交换 stop
和 close
电话的位置
//rest of the code
close(messageChannel)
stop(wg)
这样,当所有消息都插入 messageChannel
时,您调用 close(messageChannel)
,然后调用 stop(wg)
,这会阻塞 wg.Wait
调用。 close(messageChannel)
调用确保一旦从 messageChannel
读取所有消息,worker
goroutines 内 messageChannel
上的 for-range
循环将退出,并且所有 defer wg.Done()
调用将被执行。一旦发生这种情况,wg.Wait()
将解除阻塞,程序将正常完成执行。
我是 Golang 的新手,正在尝试了解 Golang 中 WaitGroups 和并发的工作原理。在此示例中,创建了 5 个工人,并使用一个通道将工作传递给每个工人。让工作人员休眠 1 秒钟以模拟繁重的计算。一切顺利,但程序并没有正常退出。而是打印此错误消息。请帮助理解为什么会这样。
fatal error: all goroutines are asleep - deadlock!
这是代码,
import (
"fmt"
"sync"
"time"
)
func worker(wg *sync.WaitGroup, messageChannel <-chan string) {
defer wg.Done()
for i := range messageChannel {
time.Sleep(time.Second)
fmt.Println("done processing - ", i)
}
}
func stop(wg *sync.WaitGroup) {
fmt.Println("waiting on the main thread")
wg.Wait()
}
func main() {
wg := new(sync.WaitGroup)
messageChannel := make(chan string, 50)
// create workers
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(wg, messageChannel)
}
// input some messages
for i := 0; i < 10; i++ {
messageChannel <- fmt.Sprint(i)
}
stop(wg)
close(messageChannel)
}
提前致谢!
为了扩展@Peter 的评论,这里是您编写的代码的执行流程:
- 初始化后,启动
worker
goroutines。每个worker
goroutine 的范围将超过messageChannel
,延迟 1 秒将打印出一条消息。 - 接下来,您通过 for 循环在
messageChannel
中插入一些消息。每个可用的worker
goroutine 都会收到一条消息,直到所有消息都被处理并打印出来。之后,worker
goroutines 正在等待来自messageChannel
. 的新消息
- 在
messageChannel
中插入消息的 for 循环完成后,执行stop
函数,该函数阻塞wg.Wait()
并等待所有wg.Done()
在所有worker
个协程中执行的调用。但是,由于messageChannel
未关闭,因此worker
个 goroutines 中的 none 个可以完成执行并执行wg.Done()
个调用中的 none 个。 worker
goroutines 卡住了,因为messageChannel
永远不会关闭,main
goroutine 卡住了,因为wg.Wait()
在stop
函数中调用,你最终会陷入僵局,所有 goroutines 都在睡觉。
根据建议,您只需交换 stop
和 close
电话的位置
//rest of the code
close(messageChannel)
stop(wg)
这样,当所有消息都插入 messageChannel
时,您调用 close(messageChannel)
,然后调用 stop(wg)
,这会阻塞 wg.Wait
调用。 close(messageChannel)
调用确保一旦从 messageChannel
读取所有消息,worker
goroutines 内 messageChannel
上的 for-range
循环将退出,并且所有 defer wg.Done()
调用将被执行。一旦发生这种情况,wg.Wait()
将解除阻塞,程序将正常完成执行。