`all goroutines are sleep - deadlock!` 我不明白为什么
A case of `all goroutines are asleep - deadlock!` I can't figure out why
TL;DR:典型案例all goroutines are asleep, deadlock!
却想不通
我正在解析 Wiktionary XML 转储以构建单词数据库。我将每篇文章的文本解析推迟到 goroutine,希望它能加快这个过程。
它是 7GB,在我的机器上连续处理时不到 2 分钟,但如果我可以利用所有内核,为什么不呢。
我是线程处理的新手,我遇到了 all goroutines are asleep, deadlock!
错误。
这里出了什么问题?
这可能根本不高效,因为它使用无缓冲通道,所以所有 goroutines 最终都有效地串行执行,但我的想法是学习和理解线程,并用不同的替代方案来衡量它需要多长时间:
- 无缓冲通道
- 不同大小的缓冲通道
- 一次只调用与
runtime.NumCPU()
一样多的 goroutine
我的伪代码代码摘要:
while tag := xml.getNextTag() {
wg.Add(1)
go parseTagText(chan, wg, tag.text)
// consume a channel message if available
select {
case msg := <-chan:
// do something with msg
default:
}
}
// reading tags finished, wait for running goroutines, consume what's left on the channel
for msg := range chan {
// do something with msg
}
// Sometimes this point is never reached, I get a deadlock
wg.Wait()
----
func parseTagText(chan, wg, tag.text) {
defer wg.Done()
// parse tag.text
chan <- whatever // just inform that the text has been parsed
}
在 Go Playground 上的完整示例中,您:
创建一个通道(第 39 行,results := make(chan langs)
)和一个等待组(第 40 行,var wait sync.WaitGroup
)。到目前为止一切顺利。
循环:在循环中,有时会分拆一个任务:
if ...various conditions... {
wait.Add(1)
go parseTerm(results, &wait, text)
}
在循环中,有时会从通道进行非阻塞读取(如您的问题所示)。这里也没有问题。但是...
循环结束时,使用:
for res := range results {
...
}
without 曾经在一个地方调用 close(results)
,在所有作者完成之后。此循环使用从通道读取的 blocking。只要某个 writer goroutine 还在运行,阻塞读就可以阻塞而不用让整个系统停止,但是当最后一个 writer 写完退出时,就没有剩余的 writer goroutines 了。任何 other 个剩余的 goroutine 可能会拯救你,但还有 none.
由于你正确使用了var wait
(在writer中正确的地方加了1,在正确的地方调用了Done()
),解决方法是多加一个goroutine,这样会成为拯救你的人:
go func() {
wait.Wait()
close(results)
}()
你应该在进入 for res := range results
循环之前分离这个救援 goroutine。 (如果您更早地将其关闭,它可能会看到 wait
变量过快地计数到零,就在它通过关闭另一个 parseTerm
再次计数之前。)
这个匿名函数将阻塞在 wait
变量的 Wait()
函数中,直到最后一个 writer goroutine 调用了最终的 wait.Done()
,这将解除阻塞 this 协程。然后这个 goroutine 将调用 close(results)
,这将安排 main
goroutine 中的 for
循环完成,解除该 goroutine 的阻塞。当这个 goroutine(救援者)returns 终止时,不再有救援者,但我们不再需要任何救援者。
(此主代码不必要地调用 wait.Wait()
:因为 for
直到 new goroutine 中的 wait.Wait()
才终止已经解除阻塞,我们知道下一个 wait.Wait()
将立即 return。所以我们可以放弃第二个调用,尽管保留它是无害的。)
问题是什么都没有关闭结果通道,但是范围循环只有在关闭时才会退出。我已经简化了你的代码来说明这一点并提出了一个解决方案 - 基本上在 goroutine 中使用数据:
// This is our producer
func foo(i int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- i
fmt.Println(i, "done")
}
// This is our consumer - it uses a different WG to signal it's done
func consumeData(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := range ch {
fmt.Println(x)
}
fmt.Println("ALL DONE")
}
func main() {
ch := make(chan int)
wg := sync.WaitGroup{}
// create the producers
for i := 0; i < 10; i++ {
wg.Add(1)
go foo(i, ch, &wg)
}
// create the consumer on a different goroutine, and sync using another WG
consumeWg := sync.WaitGroup{}
consumeWg.Add(1)
go consumeData(ch,&consumeWg)
wg.Wait() // <<<< means that the producers are done
close(ch) // << Signal the consumer to exit
consumeWg.Wait() // << Wait for the consumer to exit
}
TL;DR:典型案例all goroutines are asleep, deadlock!
却想不通
我正在解析 Wiktionary XML 转储以构建单词数据库。我将每篇文章的文本解析推迟到 goroutine,希望它能加快这个过程。
它是 7GB,在我的机器上连续处理时不到 2 分钟,但如果我可以利用所有内核,为什么不呢。
我是线程处理的新手,我遇到了 all goroutines are asleep, deadlock!
错误。
这里出了什么问题?
这可能根本不高效,因为它使用无缓冲通道,所以所有 goroutines 最终都有效地串行执行,但我的想法是学习和理解线程,并用不同的替代方案来衡量它需要多长时间:
- 无缓冲通道
- 不同大小的缓冲通道
- 一次只调用与
runtime.NumCPU()
一样多的 goroutine
我的伪代码代码摘要:
while tag := xml.getNextTag() {
wg.Add(1)
go parseTagText(chan, wg, tag.text)
// consume a channel message if available
select {
case msg := <-chan:
// do something with msg
default:
}
}
// reading tags finished, wait for running goroutines, consume what's left on the channel
for msg := range chan {
// do something with msg
}
// Sometimes this point is never reached, I get a deadlock
wg.Wait()
----
func parseTagText(chan, wg, tag.text) {
defer wg.Done()
// parse tag.text
chan <- whatever // just inform that the text has been parsed
}
在 Go Playground 上的完整示例中,您:
创建一个通道(第 39 行,
results := make(chan langs)
)和一个等待组(第 40 行,var wait sync.WaitGroup
)。到目前为止一切顺利。循环:在循环中,有时会分拆一个任务:
if ...various conditions... { wait.Add(1) go parseTerm(results, &wait, text) }
在循环中,有时会从通道进行非阻塞读取(如您的问题所示)。这里也没有问题。但是...
循环结束时,使用:
for res := range results { ... }
without 曾经在一个地方调用
close(results)
,在所有作者完成之后。此循环使用从通道读取的 blocking。只要某个 writer goroutine 还在运行,阻塞读就可以阻塞而不用让整个系统停止,但是当最后一个 writer 写完退出时,就没有剩余的 writer goroutines 了。任何 other 个剩余的 goroutine 可能会拯救你,但还有 none.
由于你正确使用了var wait
(在writer中正确的地方加了1,在正确的地方调用了Done()
),解决方法是多加一个goroutine,这样会成为拯救你的人:
go func() {
wait.Wait()
close(results)
}()
你应该在进入 for res := range results
循环之前分离这个救援 goroutine。 (如果您更早地将其关闭,它可能会看到 wait
变量过快地计数到零,就在它通过关闭另一个 parseTerm
再次计数之前。)
这个匿名函数将阻塞在 wait
变量的 Wait()
函数中,直到最后一个 writer goroutine 调用了最终的 wait.Done()
,这将解除阻塞 this 协程。然后这个 goroutine 将调用 close(results)
,这将安排 main
goroutine 中的 for
循环完成,解除该 goroutine 的阻塞。当这个 goroutine(救援者)returns 终止时,不再有救援者,但我们不再需要任何救援者。
(此主代码不必要地调用 wait.Wait()
:因为 for
直到 new goroutine 中的 wait.Wait()
才终止已经解除阻塞,我们知道下一个 wait.Wait()
将立即 return。所以我们可以放弃第二个调用,尽管保留它是无害的。)
问题是什么都没有关闭结果通道,但是范围循环只有在关闭时才会退出。我已经简化了你的代码来说明这一点并提出了一个解决方案 - 基本上在 goroutine 中使用数据:
// This is our producer
func foo(i int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- i
fmt.Println(i, "done")
}
// This is our consumer - it uses a different WG to signal it's done
func consumeData(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := range ch {
fmt.Println(x)
}
fmt.Println("ALL DONE")
}
func main() {
ch := make(chan int)
wg := sync.WaitGroup{}
// create the producers
for i := 0; i < 10; i++ {
wg.Add(1)
go foo(i, ch, &wg)
}
// create the consumer on a different goroutine, and sync using another WG
consumeWg := sync.WaitGroup{}
consumeWg.Add(1)
go consumeData(ch,&consumeWg)
wg.Wait() // <<<< means that the producers are done
close(ch) // << Signal the consumer to exit
consumeWg.Wait() // << Wait for the consumer to exit
}