Goroutines、通道和死锁

Goroutines, Channels and Deadlock

我想更多地了解 go 的通道和 goroutines,所以我决定制作一个小程序来计算文件中的单词,由 bufio.NewScanner 对象读取:

nCPUs := flag.Int("cpu", 2, "number of CPUs to use")
flag.Parse()
runtime.GOMAXPROCS(*nCPUs)    

scanner := bufio.NewScanner(file)
lines := make(chan string)
results := make(chan int)

for i := 0; i < *nCPUs; i++ {
    go func() {
        for line := range lines {
            fmt.Printf("%s\n", line)
            results <- len(strings.Split(line, " "))
        }
    }()
}

for scanner.Scan(){
    lines <- scanner.Text()
}
close(lines)


acc := 0
for i := range results {
      acc += i
 }

fmt.Printf("%d\n", acc)

现在,在我发现的大多数示例中,linesresults 通道都会被缓冲,例如 make(chan int, NUMBER_OF_LINES_IN_FILE)。尽管如此,在 运行 这段代码之后,我的程序仍然存在 fatal error: all goroutines are asleep - deadlock! 错误消息。

基本上我认为我需要两个通道:一个用于与 goroutine 通信文件中的行(因为它可以是任何大小,我不想认为我需要通知大小make(chan) 函数调用。另一个通道将从 goroutine 收集结果,在主函数中我将使用它来计算累积结果。

以这种方式使用 goroutines 和通道进行编程的最佳选择应该是什么?非常感谢任何帮助。

go 中的通道不受 default 缓冲,这意味着您生成的 none 个匿名 goroutine 可以发送到 results 通道,直到您开始试图从那个频道接收。在 scanner.Scan() 完成填充 line 通道之前,它不会在主程序中开始执行...这是在您的匿名函数可以发送到 results 通道并重新启动它们的循环之前,您无法执行此操作。死锁。

你的代码中的另一个问题,即使通过缓冲通道简单地修复了上述问题,for i := range results 也会在没有更多结果时死锁被送入其中,因为频道还没有关闭。

编辑:如果您想避免 缓冲频道,这里有一个潜在的solution。基本上,第一个问题是通过一个新的 goroutine 执行发送到 results 通道来避免的,从而允许 lines 循环完成。第二个问题(不知道何时停止读取通道)可以通过在创建 goroutine 时对其进行计数并在每个 goroutine 被计算在内时显式关闭通道来避免。对等待组做类似的事情可能更好,但这只是展示如何在无缓冲的情况下执行此操作的一种非常快速的方法。

正如@AndrewN 所指出的,问题是每个 goroutine 都到达了它试图发送到 results 通道的地步,但是这些发送将被阻塞,因为 results 通道是无缓冲的在 for i := range results 循环之前,没有任何内容从他们那里读取。你永远不会进入那个循环,因为你首先需要完成 for scanner.Scan() 循环,它试图将所有 line 发送到 lines 通道,这个通道被阻塞是因为 goroutines 是永远不会循环回到 range lines 因为他们被困在发送到 results.

要解决此问题,您可能会尝试做的第一件事是将 scanner.Scan() 内容放入 goroutine 中,以便某些内容可以立即开始读取 results 频道。但是,您将遇到的下一个问题是知道何时结束 for i := range results 循环。你想要关闭 results 通道,但只有在原始 goroutines 完成读取 lines 通道之后。您可以在关闭 lines 通道后立即关闭 results 通道,但是我认为这可能会引入潜在的竞争,所以最安全的做法也是等待原来的两个 goroutine 完成后再关闭results 频道:(playground link):

package main

import "fmt"
import "runtime"
import "bufio"
import "strings"
import "sync"

func main() {
    runtime.GOMAXPROCS(2)

    scanner := bufio.NewScanner(strings.NewReader(`
hi mom
hi dad
hi sister
goodbye`))
    lines := make(chan string)
    results := make(chan int)

    wg := sync.WaitGroup{}
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func() {
            for line := range lines {
                fmt.Printf("%s\n", line)
                results <- len(strings.Split(line, " "))
            }
            wg.Done()
        }()
    }

    go func() {
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        close(lines)
        wg.Wait()
        close(results)
    }()

    acc := 0
    for i := range results {
        acc += i
    }

    fmt.Printf("%d\n", acc)
}