等待所有 goroutines 完成并合并结果

Waiting for all goroutines to finish and combining the results

我很难理解 goroutines、通道和所有同步的东西。我相信我理解这些概念,但我缺少几行来连接我拥有的所有信息。另外,大多数例子感觉太简单了,所以我无法正确理解实际发生的事情。

我正在为网站编写一个简单的分析工具。其中一项功能是检查是否可以访问该网站上的所有链接。显然,每个网站上都有很多链接,因此它似乎是一个很好的 goroutines 候选者。问题是,在安排所有 goroutine 之后,我需要取回所有结果,以便一次性将它们全部呈现给用户。

我目前拥有的是:

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}

    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        go func() {
            wg.Add(1)
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)
            var internal bool

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }

            links = append(links, models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            })

            wg.Done()
        }()
    })

    wg.Wait()

    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}

我的代码似乎可以工作,但我觉得我错过了一些东西(或者至少它可能会更好)。我有几个 concerns/questions:

  1. 如果网站包含 1000 个链接,我会生成 1000 个 goroutine,我相信它不是那么聪明。可能我需要一个工作池或类似的东西,对吗?
  2. 是否可以只为这个例子使用频道?我不知道 goquery 会找到多少个链接,所以我无法轻易 range 处理发送到频道的元素。另外,我不能轻易地向另一个频道发送一些 done 消息,因为我不知道这个 Each 什么时候结束。此频道上的每个 for range 都处于阻塞状态,因此应用正在恢复同步。
  3. 我相信这在应用程序中很常见,您开始对某些内容进行迭代,并且希望在每次迭代时执行一些异步工作并在迭代结束时收集所有结果。我无法理解这个概念。我不知道如何处理这种情况。

您可以使用信号量来限制并发。这仍然会产生“1000 个 goroutines”,但确保在给定时间只有 5 个 http 请求在运行。您可以更改 maxParallel 的值以增加或减少并行请求的数量。

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}
    linkChan := make(chan models.Link)
    doneChan := make(chan struct{})
    maxParallel := 5
    semaphore := make(chan struct{}, maxParallel)
    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        wg.Add(1)
        go func() {
            semaphore <- struct{}{}
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }
            linkChan <- models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            }
            wg.Done()
            <-semaphore
        }()
    })
    go func() {
        wg.Wait()
        doneChan <- struct{}{}
    }()

    // Drain the channel
    for {
        select {
        case l := <-linkChan:
            links = append(links, l)
        case <-doneChan:
            return
        }
    }
    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}