解决 goroutines 死锁

Solving goroutines deadlock

我一直在尝试解决我在Golang并发中遇到的这个简单问题。我一直在搜索所有可能的解决方案,但没有发现任何特定于我的问题(或者我可能会错过一个)。这是我的代码:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2)
    go producer(ch, 200*time.Millisecond, 5)

    for {
        fmt.Println(<-ch)    
    }

    close(ch)
}

它打印错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]: main.main() D:/Code/go/src/testconcurrency/main.go:23 +0xca exit status 2

避免此错误的有效方法是什么?谢谢。

问题是 <-ch 正在阻塞,因此如果您不向通道添加任何新值,它将永远阻塞。一种方法是用一个开关 select 替换它,该开关也会阻塞但允许在多个频道上收听。您还必须添加退出通道。在您的示例中,一旦退出通道收到两个值,我们就可以中断。 break 语句需要一个标签,因为我们想退出 switch 和 for 循环。

https://play.golang.org/p/wGdCulZDnrx

另一种方法是有多个输入通道,并在发送完成后立即关闭它们。为此,每个 goroutine 都需要有自己的通道,否则我们将在第一个 goroutine 完成时退出。

第三个选项是创建一个合并函数,将多个通道合并为一个。这允许将通道的创建移动到生产者中,因此它们在一个位置创建、填充和关闭。 merge函数比较复杂,但是已经脱离了业务逻辑代码,可以单独理解和测试。然后主要代码被简化为:

ch1 := producer(100*time.Millisecond, 2)
ch2 := producer(200*time.Millisecond, 5)

for i := range merge(ch1, ch2) {
    fmt.Println(i)
}

https://play.golang.org/p/2mv8ILhJPIB

合并函数来自 https://blog.golang.org/pipelines

你有 "short-lived" 的生产者,他们只在有限的时间内在通道上发送值,并且你有一个无休止的 for 循环,它无休止地从通道接收值,没有终止条件,并且通道仅在此无限循环后关闭。一旦生产者停止发送值,就会陷入僵局。

通道必须由生产者关闭,表示不再发送任何值。由于你有多个不同步的生产者(生产者之间不同步),一般你无法判断哪个先完成,所以你不能指定一个关闭通道(而且一个通道只能关闭一次, 请参阅 ; and ).

你必须"coordinate"生产者,当所有人都完成他们的工作时,协调者应该关闭频道。

并且消费者应该在通道上使用 for range,因为 for range 构造从通道接收在通道关闭之前发送到它的所有值,然后它会自动终止。

协调建议使用sync.WaitGroup. Whether you use a global one in this case or a local one and you pass it to producers is up to you. Using a local will make the solution more general and easier to extend. One thing to note is that you must pass a pointer to sync.WaitGroup. Whenever you spin up a new producer, increment the waitgroup using WaitGroup.Add(). When a producer is done, it can signal this using WaitGroup.Done(), preferably using defer (so it runs no matter what, mitigating the deadlock in case of abnormal circumstances). And the controller can wait for all producers to finish using WaitGroup.Wait()

这是一个完整的解决方案:

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int)

    wg.Add(1)
    go producer(ch, 100*time.Millisecond, 2, wg)
    wg.Add(1)
    go producer(ch, 200*time.Millisecond, 5, wg)

    go func() {
        wg.Wait()
        close(ch)
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

输出(在 Go Playground 上尝试):

0
0
1
1
2
3
4

参见相关问题:

这个问题可以使用两个等待组以优雅的方式解决。通过关闭通道 ch,我们向消费者发出没有更多数据的信号。

解决方案适用于更多消费者。

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for x := range ch {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    producers := &sync.WaitGroup{}
    consumers := &sync.WaitGroup{}

    producers.Add(2)
    go producer(ch, 100*time.Millisecond, 2, producers)
    go producer(ch, 200*time.Millisecond, 5, producers)

    consumers.Add(1)
    go consumer(ch, consumers)

    producers.Wait()
    close(ch)
    consumers.Wait()
}

更简单的答案-其中一个生产者需要关闭通道,而消费者可以在通道上进行范围。

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int, closer bool) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)   
    }
    if closer {
        close(ch)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2, false)
    go producer(ch, 200*time.Millisecond, 5, true)

    for i := range ch {
        fmt.Println(i)
    }

}

当然,除非您知道哪个生产者总是最后完成,否则您不会希望在实际代码中执行此操作。更好的设计在其他答案的 WaitGroup-based 模式中。但这是这段代码避免死锁的最简单方法。

您需要同步您的 goroutine 中的所有异步进程。您的主线程和 goroutine 线程不是同步进程。你的主线程永远不知道什么时候停止从 goroutines 调用通道。由于你的主线程在通道上循环,它总是从通道调用值,当 goroutines 完成并且通道停止发送值时,你的主线程无法从通道获取更多值,因此情况变成死锁。为避免这种情况,请使用 sync.WaitGroup 同步异步过程。

代码如下:

package main

import (
    "fmt"
    "time"
    "sync"
)

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
    defer wg.Done();
}

func main() {
    wg  := &sync.WaitGroup{}
    ch  := make(chan int);

    wg.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wg);
    go producer(ch, 200*time.Millisecond, 5, wg);

    go func() {   
    wg.Wait()
    close(ch)
    }()

    // print the outputs
    for i:= range ch {
        fmt.Println(i);
    }
}

https://play.golang.org/p/euMTGTIs83g

希望对您有所帮助。

由于我的解决方案看起来与已经回答的有点相似,所以我在修改之前将其更改为原始答案以适应 OP 问题。

代码如下:

package main

import (
    "fmt"
    "time"
    "sync"
)

// producer produce values tobe sent to consumer
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
}

// consumer consume all values from producers
func consumer(ch chan int, out chan int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:= range ch {
        out <- i
    }
}

// synchronizer synchronize all goroutines to avoid deadlocks
func synchronizer(ch chan int, out chan int, wgp *sync.WaitGroup, wgc *sync.WaitGroup) {
    wgp.Wait()
    close(ch)
    wgc.Wait()
    close(out)
}

func main() {
    wgp  := &sync.WaitGroup{}
    wgc  := &sync.WaitGroup{}
    ch  := make(chan int);
    out := make(chan int);

    wgp.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wgp);
    go producer(ch, 200*time.Millisecond, 5, wgp);

    wgc.Add(1);
    go consumer(ch, out, wgc)

    go synchronizer(ch, out, wgp, wgc)

    // print the outputs
    for i:= range out {
        fmt.Println(i);
    }
}

使用 consumer goroutine fan-in 来自多个 goroutine 的所有输入并从 consumer goroutine 读取所有值。

希望对您有所帮助。