通道关闭但所有 goroutines 都在睡觉 - 死锁

A channel is closed but all goroutines are asleep - deadlock

是的,它看起来像是 Whosebug 上最重复的问题之一,但请花几分钟时间回答这个问题。

func _Crawl(url string, fetcher Fetcher, ch chan []string) {
    if store.Read(url) == true {
        return
    } else {
        store.Write(url)
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Printf("not found: %s\n", url)
    }
    fmt.Printf("found: %s %q\n", url, body)
    ch <- urls
}

func Crawl(url string, fetcher Fetcher) {
    UrlChannel := make(chan []string, 4)
    go _Crawl(url, fetcher, UrlChannel)
    for urls, ok := <- UrlChannel; ok; urls, ok = <- UrlChannel{
        for _, i := range urls {
            go _Crawl(i, fetcher, UrlChannel)
        }
    }
    close(UrlChannel) //The channel is closed.
 }

func main() {
   Crawl("http://golang.org/", fetcher)
}

循环结束后我将关闭频道。程序 returns 正确结果但在最后引发错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:

main.Crawl(0x113a2f, 0x12, 0x1800d0, 0x10432220)

/tmp/sandbox854979773/main.go:55 +0x220

main.main()

/tmp/sandbox854979773/main.go:61 +0x60

我的 goroutine 有什么问题?

好吧,乍一看后,您可以使用以下范围来缩短 for

for urls := range UrlChannel { ... }

它将迭代直到通道关闭并且看起来好多了。

此外,您在函数 _Crawl() 的第一个 if 中还有一个早期的 return,所以如果第一个条件为真函数将结束,并且不会向通道传递任何内容,因此从该通道接收的代码将永远等待。

另外,在你的第二个 for 中,你正在为每个 url 创建 goroutines,但你并没有等待它们,实际上那些 goroutines 会尝试发送一些东西到一个封闭的频道。似乎这并没有发生,因为在这种情况下代码会崩溃,您可以为此使用 WaitGroup

在简历中,您的代码有几种可能的死锁情况。

|||超级编辑|||:

我应该写信给你,你的代码有点乱,解决方案可能很简单 WaitGroup,但我害怕让你难过,因为我发现了太多问题,但如果你真的想要学习如何编写并发代码,您应该首先考虑完全没有并发的代码或伪代码,然后尝试添加魔法。

在你的情况下,我看到的是一个递归解决方案,因为 url 是从 HTML 文档中以树的形式获取的,类似于 DFS:

func crawl(url string, fetcher Fetcher) {
    // if we've visited this url just stop the recursion
    if store.Read(url) == true {
       return
    }
    store.Write(url)

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Printf("not found: %s\n", url)
        return // early return if there's no urls there's no reason to continue
    }
    fmt.Printf("found: %s %q\n", url, body)

    // this part will change !!
    // ...
    for _, url := range urls {
        crawl(url, fetcher)
    }
    //
}

func main() {
    crawl("http://golang.org", fetcher)
}

现在第二步识别并发代码,在这种情况下很容易,因为每个 url 都可以同时获取(有时是并行的),我们只需添加一个 WaitGroup 并为每个 [= 创建一个 goroutine 63=],现在只需要更新 for 来获取 urls(它只是 for 块):

// this code will be in the comment: "this part will change !!"
//
// this var is just a thread-safe counter
var wg sync.WaitGroup

// set the WaitGroup counter with the len of urls slice
wg.Add(len(urls))

for _, url := range urls {

    // it's very important pass the url as a parameter 
    // because the var url changes for each loop (url := range)
    go func(u string) {

        // Decrement the counter (-1) when the goroutine completes
        defer wg.Done()

        crawl(u, fetcher)

    }(url)
}

wg.Wait() // wait for all your goroutines
// ...

未来的考虑,也许你想控制 goroutines(或 worker)的数量,因为你必须使用 Fan In 或 Fan Out 之类的东西,你可以在这里找到更多: https://blog.golang.org/advanced-go-concurrency-patternshttps://blog.golang.org/pipelines

但是不要害怕在 Go 中创建数以千计的 goroutine,它们是 very cheap

注意:我没有编译代码,可能某处有小错误 :)

上述两种解决方案和 range 通过通道循环都存在相同的问题。 问题是通道关闭后会结束循环,但循环结束后通道会关闭。 所以我们需要知道什么时候关闭打开的渠道。我相信我们需要计算启动的作业(goroutines)。 但在这种情况下,我只是丢失了一个计数器变量。既然是巡回练习,应该不复杂。

func _Crawl(url string, fetcher Fetcher, ch chan []string) {
    if store.Read(url) == false {
        store.Write(url)    
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Printf("not found: %s\n", url)
        } else {
            fmt.Printf("found: %s %q\n", url, body)
        }
        ch <- urls
    }
}

func Crawl(url string, depth int, fetcher Fetcher) {
    UrlChannel := make(chan []string, 4)
    go _Crawl(url, fetcher, UrlChannel)
    for urls := range UrlChannel {
        for _, url := range urls {
            go _Crawl(url, fetcher, UrlChannel)
        }
        depth--
        if depth < 0 {
            close(UrlChannel)
        }
    }
}