为什么这个 golang 脚本让我陷入僵局? + 几个问题

Why is this golang script giving me a deadlock ? + a few questions

我从 github 上的某个人那里得到了这段代码,我正在尝试使用它来理解并发性。

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup

func sad(url string) string {
    fmt.Printf("gonna sleep a bit\n")
    time.Sleep(2 * time.Second)
    return url + " added stuff"
}

func main() {
    sc := bufio.NewScanner(os.Stdin)
    urls := make(chan string)
    results := make(chan string)

    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range urls {
                n := sad(url)
                results <- n
            }
        }()
    }

    for sc.Scan() {
        url := sc.Text()
        urls <- url
    }

    for result := range results {
        fmt.Printf("%s arrived\n", result)
    }

    wg.Wait()
    close(urls)
    close(results)
}

我有几个问题:

  1. 为什么这段代码会出现死锁?
  2. for 循环是如何存在的从用户接收输入的操作之前,go 例程是否会等到 urls 通道中传递任何内容然后开始工作?我不明白这个,因为它不是顺序的,比如为什么从用户那里获取输入然后将每个输入放入 urls 通道然后 运行 go routines 被认为是错误的?
  3. 在 for 循环中,我有 另一个循环 ,它在 urls 通道上迭代,每个 go 例程是否只处理一行输入?还是一个 go 例程一次处理多行?这些是如何工作的?
  4. 我在这里收集的输出是否正确?

for 循环创建了 20 个 goroutine,它们都在等待来自 urls 通道的输入。当有人写入此通道时,其中一个 goroutine 将拾取它并在其中继续工作。这是一个典型的工作池实现。

然后,scanner 逐行读取输入,并将其发送到 urls 通道,其中一个 goroutine 将拾取它并将响应写入 results 通道。此时,没有其他 goroutines 从 results 通道读取,所以这将阻塞。

当扫描器读取 URL 时,所有其他 goroutine 将拾取它们并阻止。因此,如果扫描器读取超过 20 个 URL,它将死锁,因为所有 goroutine 都将等待 reader.

如果 URL 少于 20 个,扫描器 for 循环将结束,并读取结果。然而,这最终也会死锁,因为 for 循环将在通道关闭时终止,并且没有人可以关闭通道。

要解决此问题,首先,请在阅读完后立即关闭 urls 频道。这将释放 goroutines 中的所有 for 循环。然后,您应该将从 results 通道读取的 for 循环放入 goroutine 中,这样您就可以在处理结果时调用 wg.Waitwg.Wait后,您可以关闭results频道。

这不能保证 results 频道中的所有项目都会被阅读。该程序可能会在处理完所有消息之前终止,因此请使用您在从 results 通道读取的 goroutine 末尾关闭的第三个通道。即:

done:=make(chan struct{})
go func() {
  defer close(done)
  for result := range results {
        fmt.Printf("%s arrived\n", result)
    }
}()
wg.Wait()
close(results)
<-done

大部分情况下您做事都正确,但也有一些不合时宜的地方。 for sc.Scan() 循环将继续,直到 Scanner 完成,而 for result := range results 循环永远不会 运行,因此没有 go 例程(在这种情况下为 'main')将能够接收来自results。当 运行 使用您的示例时,我在 for sc.Scan() 之前启动了 for result := range results 循环,并且还在其自己的 go 例程中启动了循环——否则将永远无法到达 for sc.Scan()

go func() {
    for result := range results {
        fmt.Printf("%s arrived\n", result)
    }
}()

for sc.Scan() {
    url := sc.Text()
    urls <- url
}

另外,因为你 运行 wg.Wait()close(urls) 之前,主 goroutine 被阻塞等待 20 sad() go routines 完成。但是在调用 close(urls) 之前他们无法完成。所以在等待等待组之前关闭那个通道。

close(urls)
wg.Wait()
close(results)

我对以前的答案不是很满意,所以这里有一个基于 go tour, the go doc, the specifications.

中记录的行为的解决方案
package main

import (
    "bufio"
    "fmt"
    "strings"
    "sync"
    "time"
)

var wg sync.WaitGroup

func sad(url string) string {
    fmt.Printf("gonna sleep a bit\n")
    time.Sleep(2 * time.Millisecond)
    return url + " added stuff"
}

func main() {
    // sc := bufio.NewScanner(os.Stdin)
    sc := bufio.NewScanner(strings.NewReader(strings.Repeat("blah blah\n", 15)))
    urls := make(chan string)
    results := make(chan string)

    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range urls {
                n := sad(url)
                results <- n
            }
        }()
    }
    // results is consumed by so many goroutines
    // we must wait for them to finish before closing results
    // but we dont want to block here, so put that into a routine.
    go func() {
        wg.Wait()
        close(results)
    }()

    go func() {
        for sc.Scan() {
            url := sc.Text()
            urls <- url
        }
        close(urls) // done consuming a channel, close it, right away.
    }()

    for result := range results {
        fmt.Printf("%s arrived\n", result)
    } // the program will finish when it gets out of this loop.
    // It will get out of this loop because you have made sure the results channel is closed.

}