解决 goroutines 死锁
Solving goroutines deadlock
我一直在尝试解决我在Golang并发中遇到的这个简单问题。我一直在搜索所有可能的解决方案,但没有发现任何特定于我的问题(或者我可能会错过一个)。这是我的代码:
package main
import (
"fmt"
"time"
)
func producer(ch chan int, d time.Duration, num int) {
for i:=0; i<num; i++ {
ch <- i
time.Sleep(d)
}
}
func main() {
ch := make(chan int)
go producer(ch, 100*time.Millisecond, 2)
go producer(ch, 200*time.Millisecond, 5)
for {
fmt.Println(<-ch)
}
close(ch)
}
它打印错误:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
D:/Code/go/src/testconcurrency/main.go:23 +0xca
exit status 2
避免此错误的有效方法是什么?谢谢。
问题是 <-ch
正在阻塞,因此如果您不向通道添加任何新值,它将永远阻塞。一种方法是用一个开关 select 替换它,该开关也会阻塞但允许在多个频道上收听。您还必须添加退出通道。在您的示例中,一旦退出通道收到两个值,我们就可以中断。 break 语句需要一个标签,因为我们想退出 switch 和 for 循环。
https://play.golang.org/p/wGdCulZDnrx
另一种方法是有多个输入通道,并在发送完成后立即关闭它们。为此,每个 goroutine 都需要有自己的通道,否则我们将在第一个 goroutine 完成时退出。
第三个选项是创建一个合并函数,将多个通道合并为一个。这允许将通道的创建移动到生产者中,因此它们在一个位置创建、填充和关闭。 merge函数比较复杂,但是已经脱离了业务逻辑代码,可以单独理解和测试。然后主要代码被简化为:
ch1 := producer(100*time.Millisecond, 2)
ch2 := producer(200*time.Millisecond, 5)
for i := range merge(ch1, ch2) {
fmt.Println(i)
}
你有 "short-lived" 的生产者,他们只在有限的时间内在通道上发送值,并且你有一个无休止的 for
循环,它无休止地从通道接收值,没有终止条件,并且通道仅在此无限循环后关闭。一旦生产者停止发送值,就会陷入僵局。
通道必须由生产者关闭,表示不再发送任何值。由于你有多个不同步的生产者(生产者之间不同步),一般你无法判断哪个先完成,所以你不能指定一个关闭通道(而且一个通道只能关闭一次, 请参阅 ; and ).
你必须"coordinate"生产者,当所有人都完成他们的工作时,协调者应该关闭频道。
并且消费者应该在通道上使用 for range
,因为 for range
构造从通道接收在通道关闭之前发送到它的所有值,然后它会自动终止。
协调建议使用sync.WaitGroup
. Whether you use a global one in this case or a local one and you pass it to producers is up to you. Using a local will make the solution more general and easier to extend. One thing to note is that you must pass a pointer to sync.WaitGroup
. Whenever you spin up a new producer, increment the waitgroup using WaitGroup.Add()
. When a producer is done, it can signal this using WaitGroup.Done()
, preferably using defer
(so it runs no matter what, mitigating the deadlock in case of abnormal circumstances). And the controller can wait for all producers to finish using WaitGroup.Wait()
。
这是一个完整的解决方案:
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < num; i++ {
ch <- i
time.Sleep(d)
}
}
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int)
wg.Add(1)
go producer(ch, 100*time.Millisecond, 2, wg)
wg.Add(1)
go producer(ch, 200*time.Millisecond, 5, wg)
go func() {
wg.Wait()
close(ch)
}()
for v := range ch {
fmt.Println(v)
}
}
输出(在 Go Playground 上尝试):
0
0
1
1
2
3
4
参见相关问题:
这个问题可以使用两个等待组以优雅的方式解决。通过关闭通道 ch
,我们向消费者发出没有更多数据的信号。
解决方案适用于更多消费者。
package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < num; i++ {
ch <- i
time.Sleep(d)
}
}
func consumer(ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := range ch {
fmt.Println(x)
}
}
func main() {
ch := make(chan int)
producers := &sync.WaitGroup{}
consumers := &sync.WaitGroup{}
producers.Add(2)
go producer(ch, 100*time.Millisecond, 2, producers)
go producer(ch, 200*time.Millisecond, 5, producers)
consumers.Add(1)
go consumer(ch, consumers)
producers.Wait()
close(ch)
consumers.Wait()
}
更简单的答案-其中一个生产者需要关闭通道,而消费者可以在通道上进行范围。
package main
import (
"fmt"
"time"
)
func producer(ch chan int, d time.Duration, num int, closer bool) {
for i:=0; i<num; i++ {
ch <- i
time.Sleep(d)
}
if closer {
close(ch)
}
}
func main() {
ch := make(chan int)
go producer(ch, 100*time.Millisecond, 2, false)
go producer(ch, 200*time.Millisecond, 5, true)
for i := range ch {
fmt.Println(i)
}
}
当然,除非您知道哪个生产者总是最后完成,否则您不会希望在实际代码中执行此操作。更好的设计在其他答案的 WaitGroup-based 模式中。但这是这段代码避免死锁的最简单方法。
您需要同步您的 goroutine 中的所有异步进程。您的主线程和 goroutine 线程不是同步进程。你的主线程永远不知道什么时候停止从 goroutines 调用通道。由于你的主线程在通道上循环,它总是从通道调用值,当 goroutines 完成并且通道停止发送值时,你的主线程无法从通道获取更多值,因此情况变成死锁。为避免这种情况,请使用 sync.WaitGroup
同步异步过程。
代码如下:
package main
import (
"fmt"
"time"
"sync"
)
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
for i:=0; i<num; i++ {
ch <- i;
time.Sleep(d);
}
defer wg.Done();
}
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int);
wg.Add(2);
go producer(ch, 100*time.Millisecond, 2, wg);
go producer(ch, 200*time.Millisecond, 5, wg);
go func() {
wg.Wait()
close(ch)
}()
// print the outputs
for i:= range ch {
fmt.Println(i);
}
}
https://play.golang.org/p/euMTGTIs83g
希望对您有所帮助。
由于我的解决方案看起来与已经回答的有点相似,所以我在修改之前将其更改为原始答案以适应 OP 问题。
代码如下:
package main
import (
"fmt"
"time"
"sync"
)
// producer produce values tobe sent to consumer
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
defer wg.Done();
for i:=0; i<num; i++ {
ch <- i;
time.Sleep(d);
}
}
// consumer consume all values from producers
func consumer(ch chan int, out chan int, wg *sync.WaitGroup) {
defer wg.Done();
for i:= range ch {
out <- i
}
}
// synchronizer synchronize all goroutines to avoid deadlocks
func synchronizer(ch chan int, out chan int, wgp *sync.WaitGroup, wgc *sync.WaitGroup) {
wgp.Wait()
close(ch)
wgc.Wait()
close(out)
}
func main() {
wgp := &sync.WaitGroup{}
wgc := &sync.WaitGroup{}
ch := make(chan int);
out := make(chan int);
wgp.Add(2);
go producer(ch, 100*time.Millisecond, 2, wgp);
go producer(ch, 200*time.Millisecond, 5, wgp);
wgc.Add(1);
go consumer(ch, out, wgc)
go synchronizer(ch, out, wgp, wgc)
// print the outputs
for i:= range out {
fmt.Println(i);
}
}
使用 consumer
goroutine fan-in
来自多个 goroutine 的所有输入并从 consumer
goroutine 读取所有值。
希望对您有所帮助。
我一直在尝试解决我在Golang并发中遇到的这个简单问题。我一直在搜索所有可能的解决方案,但没有发现任何特定于我的问题(或者我可能会错过一个)。这是我的代码:
package main
import (
"fmt"
"time"
)
func producer(ch chan int, d time.Duration, num int) {
for i:=0; i<num; i++ {
ch <- i
time.Sleep(d)
}
}
func main() {
ch := make(chan int)
go producer(ch, 100*time.Millisecond, 2)
go producer(ch, 200*time.Millisecond, 5)
for {
fmt.Println(<-ch)
}
close(ch)
}
它打印错误:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]: main.main() D:/Code/go/src/testconcurrency/main.go:23 +0xca exit status 2
避免此错误的有效方法是什么?谢谢。
问题是 <-ch
正在阻塞,因此如果您不向通道添加任何新值,它将永远阻塞。一种方法是用一个开关 select 替换它,该开关也会阻塞但允许在多个频道上收听。您还必须添加退出通道。在您的示例中,一旦退出通道收到两个值,我们就可以中断。 break 语句需要一个标签,因为我们想退出 switch 和 for 循环。
https://play.golang.org/p/wGdCulZDnrx
另一种方法是有多个输入通道,并在发送完成后立即关闭它们。为此,每个 goroutine 都需要有自己的通道,否则我们将在第一个 goroutine 完成时退出。
第三个选项是创建一个合并函数,将多个通道合并为一个。这允许将通道的创建移动到生产者中,因此它们在一个位置创建、填充和关闭。 merge函数比较复杂,但是已经脱离了业务逻辑代码,可以单独理解和测试。然后主要代码被简化为:
ch1 := producer(100*time.Millisecond, 2)
ch2 := producer(200*time.Millisecond, 5)
for i := range merge(ch1, ch2) {
fmt.Println(i)
}
你有 "short-lived" 的生产者,他们只在有限的时间内在通道上发送值,并且你有一个无休止的 for
循环,它无休止地从通道接收值,没有终止条件,并且通道仅在此无限循环后关闭。一旦生产者停止发送值,就会陷入僵局。
通道必须由生产者关闭,表示不再发送任何值。由于你有多个不同步的生产者(生产者之间不同步),一般你无法判断哪个先完成,所以你不能指定一个关闭通道(而且一个通道只能关闭一次, 请参阅
你必须"coordinate"生产者,当所有人都完成他们的工作时,协调者应该关闭频道。
并且消费者应该在通道上使用 for range
,因为 for range
构造从通道接收在通道关闭之前发送到它的所有值,然后它会自动终止。
协调建议使用sync.WaitGroup
. Whether you use a global one in this case or a local one and you pass it to producers is up to you. Using a local will make the solution more general and easier to extend. One thing to note is that you must pass a pointer to sync.WaitGroup
. Whenever you spin up a new producer, increment the waitgroup using WaitGroup.Add()
. When a producer is done, it can signal this using WaitGroup.Done()
, preferably using defer
(so it runs no matter what, mitigating the deadlock in case of abnormal circumstances). And the controller can wait for all producers to finish using WaitGroup.Wait()
。
这是一个完整的解决方案:
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < num; i++ {
ch <- i
time.Sleep(d)
}
}
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int)
wg.Add(1)
go producer(ch, 100*time.Millisecond, 2, wg)
wg.Add(1)
go producer(ch, 200*time.Millisecond, 5, wg)
go func() {
wg.Wait()
close(ch)
}()
for v := range ch {
fmt.Println(v)
}
}
输出(在 Go Playground 上尝试):
0
0
1
1
2
3
4
参见相关问题:
这个问题可以使用两个等待组以优雅的方式解决。通过关闭通道 ch
,我们向消费者发出没有更多数据的信号。
解决方案适用于更多消费者。
package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < num; i++ {
ch <- i
time.Sleep(d)
}
}
func consumer(ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := range ch {
fmt.Println(x)
}
}
func main() {
ch := make(chan int)
producers := &sync.WaitGroup{}
consumers := &sync.WaitGroup{}
producers.Add(2)
go producer(ch, 100*time.Millisecond, 2, producers)
go producer(ch, 200*time.Millisecond, 5, producers)
consumers.Add(1)
go consumer(ch, consumers)
producers.Wait()
close(ch)
consumers.Wait()
}
更简单的答案-其中一个生产者需要关闭通道,而消费者可以在通道上进行范围。
package main
import (
"fmt"
"time"
)
func producer(ch chan int, d time.Duration, num int, closer bool) {
for i:=0; i<num; i++ {
ch <- i
time.Sleep(d)
}
if closer {
close(ch)
}
}
func main() {
ch := make(chan int)
go producer(ch, 100*time.Millisecond, 2, false)
go producer(ch, 200*time.Millisecond, 5, true)
for i := range ch {
fmt.Println(i)
}
}
当然,除非您知道哪个生产者总是最后完成,否则您不会希望在实际代码中执行此操作。更好的设计在其他答案的 WaitGroup-based 模式中。但这是这段代码避免死锁的最简单方法。
您需要同步您的 goroutine 中的所有异步进程。您的主线程和 goroutine 线程不是同步进程。你的主线程永远不知道什么时候停止从 goroutines 调用通道。由于你的主线程在通道上循环,它总是从通道调用值,当 goroutines 完成并且通道停止发送值时,你的主线程无法从通道获取更多值,因此情况变成死锁。为避免这种情况,请使用 sync.WaitGroup
同步异步过程。
代码如下:
package main
import (
"fmt"
"time"
"sync"
)
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
for i:=0; i<num; i++ {
ch <- i;
time.Sleep(d);
}
defer wg.Done();
}
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int);
wg.Add(2);
go producer(ch, 100*time.Millisecond, 2, wg);
go producer(ch, 200*time.Millisecond, 5, wg);
go func() {
wg.Wait()
close(ch)
}()
// print the outputs
for i:= range ch {
fmt.Println(i);
}
}
https://play.golang.org/p/euMTGTIs83g
希望对您有所帮助。
由于我的解决方案看起来与已经回答的有点相似,所以我在修改之前将其更改为原始答案以适应 OP 问题。
代码如下:
package main
import (
"fmt"
"time"
"sync"
)
// producer produce values tobe sent to consumer
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
defer wg.Done();
for i:=0; i<num; i++ {
ch <- i;
time.Sleep(d);
}
}
// consumer consume all values from producers
func consumer(ch chan int, out chan int, wg *sync.WaitGroup) {
defer wg.Done();
for i:= range ch {
out <- i
}
}
// synchronizer synchronize all goroutines to avoid deadlocks
func synchronizer(ch chan int, out chan int, wgp *sync.WaitGroup, wgc *sync.WaitGroup) {
wgp.Wait()
close(ch)
wgc.Wait()
close(out)
}
func main() {
wgp := &sync.WaitGroup{}
wgc := &sync.WaitGroup{}
ch := make(chan int);
out := make(chan int);
wgp.Add(2);
go producer(ch, 100*time.Millisecond, 2, wgp);
go producer(ch, 200*time.Millisecond, 5, wgp);
wgc.Add(1);
go consumer(ch, out, wgc)
go synchronizer(ch, out, wgp, wgc)
// print the outputs
for i:= range out {
fmt.Println(i);
}
}
使用 consumer
goroutine fan-in
来自多个 goroutine 的所有输入并从 consumer
goroutine 读取所有值。
希望对您有所帮助。