如何从 url 池发出并发 GET 请求

How to make concurrent GET requests from url pool

我完成了建议的 go-tour,在 YouTube 上观看了一些教程和 gopher-conferences。差不多就是这些了。

我有一个项目需要我发送获取请求并将结果存储在文件中。但是URL的数量是8000万左右。

我只用 1000 URL 进行测试。

问题:我想我无法让它并发,尽管我遵循了一些指导方针。我不知道出了什么问题。但也许我错了,它是并发的,对我来说似乎并不快,速度感觉像顺序请求。

这是我写的代码:

package main

import (
    "bufio"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup // synchronization to wait for all the goroutines

func crawler(urlChannel <-chan string) {
    defer wg.Done()
    client := &http.Client{Timeout: 10 * time.Second} // single client is sufficient for multiple requests

    for urlItem := range urlChannel {
        req1, _ := http.NewRequest("GET", "http://"+urlItem, nil)                                           // generating the request
        req1.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent
        resp1, respErr1 := client.Do(req1)                                                                  // sending the prepared request and getting the response
        if respErr1 != nil {
            continue
        }

        defer resp1.Body.Close()

        if resp1.StatusCode/100 == 2 { // means server responded with 2xx code
            text1, readErr1 := ioutil.ReadAll(resp1.Body) // try to read the sourcecode of the website
            if readErr1 != nil {
                log.Fatal(readErr1)
            }

            f1, fileErr1 := os.Create("200/" + urlItem + ".txt") // creating the relative file
            if fileErr1 != nil {
                log.Fatal(fileErr1)
            }
            defer f1.Close()

            _, writeErr1 := f1.Write(text1) // writing the sourcecode into our file
            if writeErr1 != nil {
                log.Fatal(writeErr1)
            }
        }
    }
}

func main() {
    file, err := os.Open("urls.txt") // the file containing the url's
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close() // don't forget to close the file

    urlChannel := make(chan string, 1000) // create a channel to store all the url's

    scanner := bufio.NewScanner(file) // each line has another url
    for scanner.Scan() {
        urlChannel <- scanner.Text()
    }
    close(urlChannel)

    _ = os.Mkdir("200", 0755) // if it's there, it will create an error, and we will simply ignore it
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go crawler(urlChannel)
    }
    wg.Wait()
}

我的问题是:为什么这段代码不能同时工作?我怎样才能解决我上面提到的问题。我是否在发出并发 GET 请求时做错了什么?

设置并发管道时,要遵循的一个好的准则是始终首先设置并实例化将并发执行的侦听器(在您的情况下为爬虫),然后开始通过管道向它们提供数据(在你的情况,urlChannel).

在您的示例中,防止死锁的唯一方法是您实例化了一个缓冲通道,该通道的行数与测试文件的行数(1000 行)相同。代码的作用是将 URL 放入 urlChannel 中。由于您的文件中有 1000 行,因此 urlChannel 可以在不阻塞的情况下获取所有行。如果您在文件中放入更多 URL,执行将在填满 urlChannel.

后阻塞

这里是应该工作的代码版本:

package main

import (
    "bufio"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "sync"
    "time"
)

func crawler(wg *sync.WaitGroup, urlChannel <-chan string) {
    defer wg.Done()
    client := &http.Client{Timeout: 10 * time.Second} // single client is sufficient for multiple requests

    for urlItem := range urlChannel {
        req1, _ := http.NewRequest("GET", "http://"+urlItem, nil)                                           // generating the request
        req1.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent
        resp1, respErr1 := client.Do(req1)                                                                  // sending the prepared request and getting the response
        if respErr1 != nil {
            continue
        }

        if resp1.StatusCode/100 == 2 { // means server responded with 2xx code
            text1, readErr1 := ioutil.ReadAll(resp1.Body) // try to read the sourcecode of the website
            if readErr1 != nil {
                log.Fatal(readErr1)
            }
            resp1.Body.Close()

            f1, fileErr1 := os.Create("200/" + urlItem + ".txt") // creating the relative file
            if fileErr1 != nil {
                log.Fatal(fileErr1)
            }

            _, writeErr1 := f1.Write(text1) // writing the sourcecode into our file
            if writeErr1 != nil {
                log.Fatal(writeErr1)
            }
            f1.Close() 
        }
    }
}

func main() {
    var wg sync.WaitGroup
    file, err := os.Open("urls.txt") // the file containing the url's
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close() // don't forget to close the file

    urlChannel := make(chan string) 

    _ = os.Mkdir("200", 0755) // if it's there, it will create an error, and we will simply ignore it

    // first, initialize crawlers
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go crawler(&wg, urlChannel)
    }

    //after crawlers are initialized, start feeding them data through the channel
    scanner := bufio.NewScanner(file) // each line has another url
    for scanner.Scan() {
        urlChannel <- scanner.Text()
    }
    close(urlChannel)
    wg.Wait()
}

这里有一些代码可以让您思考。我将 URL 放在代码中,所以它是 self-sufficient,但实际上您可能会将它们通过管道传输到标准输入。我在这里做的一些事情我认为是改进,或者至少值得考虑。

在我们开始之前,我要指出我将 complete url 放在了输入流中。一方面,这让我同时支持 http 和 https。我真的没有看到在代码中硬编码方案而不是将其留在数据中的逻辑。

First,它可以处理任意大小的响应主体(您的版本将 body 读入内存,因此它受到一些并发大请求填充内存的限制).我使用 io.Copy().

[已编辑]

text1, readErr1 := ioutil.ReadAll(resp1.Body) 读取整个 http body。如果 body 很大,会占用大量内存。 io.Copy(f1,resp1.Body) 会将来自 http 响应 body 的数据直接复制到文件中,而不必将整个内容保存在内存中。它可以在一个 Read/Write 或多个中完成。

http.Response.Body 是一个 io.ReadCloser 因为 HTTP 协议期望 body 被渐进读取。 http.Response 尚未 拥有 整个 body,直到它被阅读。这就是为什么它不仅仅是一个 [] 字节。当数据从 tcp 套接字“流”进来时,将其逐步写入文件系统意味着有限数量的系统资源可以下载无限数量的数据。

但还有更多好处。 io.Copy 将在文件上调用 ReadFrom()。如果您查看 linux 实现(例如): https://golang.org/src/os/readfrom_linux.go , and dig a bit, you'll see it actually uses copy_file_range 该系统调用很酷,因为

The copy_file_range() system call performs an in-kernel copy between two file descriptors without the additional cost of transferring data from the kernel to user space and then back into the kernel.

*os.File 知道如何要求内核将数据直接从 tcp 套接字传送到文件,而您的程序甚至不必接触它。

参见https://golang.org/pkg/io/#Copy

其次,我确保使用文件名中的所有 url 组件。具有不同查询字符串的 URL 会转到不同的文件。该片段可能不会区分响应主体,因此将其包含在路径中可能会被错误地考虑。没有用于将 URL 转换为有效文件路径的很棒的启发式方法——如果这是一项严肃的任务,我可能会将数据存储在基于 url 之类的 shasum 的文件中——并创建存储在中的结果索引一个元数据文件。

第三,我处理所有的错误。 req1, _ := http.NewRequest(... 似乎是一个方便的快捷方式,但它的真正含义是您最多不知道任何错误的真正原因。我通常在向上渗透时向错误添加一些描述性文本,以确保我可以轻松地分辨出 哪个 错误是我 returning.

最后,我return成功处理了URL,这样我就可以看到最终结果了。当扫描数百万个 URLS 时,您可能还需要一个失败的列表,但成功的计数是将最终数据发回以供汇总的良好开端。

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "path/filepath"
    "time"
)

const urls_text = `http://danf.us/
https://farrellit.net/?3=2&#1
`

func crawler(urls <-chan *url.URL, done chan<- int) {
    var processed int = 0
    defer func() { done <- processed }()
    client := http.Client{Timeout: 10 * time.Second}
    for u := range urls {
        if req, err := http.NewRequest("GET", u.String(), nil); err != nil {
            log.Printf("Couldn't create new request for %s: %s", u.String(), err.Error())
        } else {
            req.Header.Add("User-agent", "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0") // changing user-agent
            if res, err := client.Do(req); err != nil {
                log.Printf("Failed to get %s: %s", u.String(), err.Error())
            } else {
                filename := filepath.Base(u.EscapedPath())
                if filename == "/" || filename == "" {
                    filename = "response"
                } else {
                    log.Printf("URL Filename is '%s'", filename)
                }
                destpath := filepath.Join(
                    res.Status, u.Scheme, u.Hostname(), u.EscapedPath(),
                    fmt.Sprintf("?%s",u.RawQuery), fmt.Sprintf("#%s",u.Fragment), filename,
                )
                if err := os.MkdirAll(filepath.Dir(destpath), 0755); err != nil {
                    log.Printf("Couldn't create directory %s: %s", filepath.Dir(destpath), err.Error())
                } else if f, err := os.OpenFile(destpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
                    log.Printf("Couldn't open destination file %s: %s", destpath, err.Error())
                } else {
                    if b, err := io.Copy(f, res.Body); err != nil {
                        log.Printf("Could not copy %s body to %s: %s", u.String(), destpath, err.Error())
                    } else {
                        log.Printf("Copied %d bytes from body of %s to %s", b, u.String(), destpath)
                        processed++
                    }
                    f.Close()
                }
                res.Body.Close()
            }
        }
    }
}

const workers = 3

func main() {
    urls := make(chan *url.URL)
    done := make(chan int)
    var submitted int = 0
    var inputted int = 0
    var successful int = 0
    for i := 0; i < workers; i++ {
        go crawler(urls, done)
    }
    sc := bufio.NewScanner(bytes.NewBufferString(urls_text))
    for sc.Scan() {
        inputted++
        if u, err := url.Parse(sc.Text()); err != nil {
            log.Printf("Could not parse %s as url: %w", sc.Text(), err)
        } else {
            submitted++
            urls <- u
        }
    }
    close(urls)
    for i := 0; i < workers; i++ {
        successful += <-done
    }
    log.Printf("%d urls input, %d could not be parsed. %d/%d valid URLs successful (%.0f%%)",
        inputted, inputted-submitted,
        successful, submitted,
        float64(successful)/float64(submitted)*100.0,
    )
}