当我在 goroutine 中 运行 wg.Wait() 时,为什么我的代码可以正常工作?

Why does my code work correctly when I run wg.Wait() inside a goroutine?

我有一个要抓取的网址列表。我想做的是将所有成功抓取的页面数据存储到一个通道中,当我完成后,将其转储到一个切片中。我不知道我会得到多少成功的抓取,所以我不能指定一个固定的长度。我希望代码到达 wg.Wait() 然后等到调用所有 wg.Done() 方法,但我从未到达 close(queue) 语句。寻找类似的答案,我遇到了这个 SO answer

作者做了类似的事情:

ports := make(chan string)
toScan := make(chan int)
var wg sync.WaitGroup

// make 100 workers for dialing
for i := 0; i < 100; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for p := range toScan {
            ports <- worker(*host, p)
        }
    }()
}

// close our receiving ports channel once all workers are done
go func() {
    wg.Wait()
    close(ports)
}()

一旦我将 wg.Wait() 包裹在 goroutine 中,就达到了 close(queue)

urls := getListOfURLS()
activities := make([]Activity, 0, limit)
queue := make(chan Activity)
for i, activityURL := range urls {
    wg.Add(1)
    go func(i int, url string) {
        defer wg.Done()
        activity, err := extractDetail(url)
        if err != nil {
            log.Println(err)
            return
        }
        queue <- activity
    }(i, activityURL)
}
    // calling it like this without the goroutine causes the execution to hang
// wg.Wait() 
// close(queue)

    // calling it like this successfully waits
go func() {
    wg.Wait()
    close(queue)
}()
for a := range queue {
    // block channel until valid url is added to queue
    // once all are added, close it
    activities = append(activities, a)
}

如果我不为 wg.Wait() 使用协程,为什么代码无法到达 close?我认为所有 defer wg.Done() 语句都被调用,所以最终它会清除,因为它到达 wg.Wait()。跟我频道接收值有关系吗?

您需要等待 goroutines 在单独的线程中完成,因为 queue 需要从中读取。当您执行以下操作时:

queue := make(chan Activity)
for i, activityURL := range urls {
    wg.Add(1)
    go func(i int, url string) {
        defer wg.Done()
        activity, err := extractDetail(url)
        if err != nil {
            log.Println(err)
            return
        }
        queue <- activity // nothing is reading data from queue.
    }(i, activityURL)
}

wg.Wait() 
close(queue)

for a := range queue {
    activities = append(activities, a)
}

每个 goroutine 在 queue <- activity 阻塞,因为 queue 是无缓冲的,没有任何东西从它读取数据。这是因为 queue 上的范围循环在 wg.Wait.

之后的主线程中

wg.Wait 只会解锁一次所有 goroutine return。但如前所述,所有 goroutine 都在通道发送时被阻塞。

当你使用单独的goroutine等待时,代码执行实际上到达queue上的范围循环。

// wg.Wait does not block the main thread.
go func() {
    wg.Wait()
    close(queue)
}()

这导致 goroutines 在 queue <- activity 语句(主线程开始读取 queue)和 运行 处解除阻塞,直到完成。依次调用每个人 wg.Done.

一旦等待的 goroutine 通过 wg.Waitqueue 关闭,主线程退出范围循环。

queue 通道是无缓冲的,所以每个试图写入它的 goroutine 都会被阻塞,因为 reader 进程还没有启动。所以没有 goroutinte 可以写,它们都挂了——结果 wg.Wait 永远等待。 尝试在单独的 goroutine 中启动 reader:

go func() {
    for a := range queue {
        // block channel until valid url is added to queue
        // once all are added, close it
       activities = append(activities, a)
    }
}()

然后启动服务员:

wg.Wait() 
close(queue)

这样你就不会在通道中累积所有数据并使其过载,而是在数据到来时获取数据并将其放入目标切片。