使用 goroutines 下载文件?

downloading files with goroutines?

我是 Go 新手,正在学习如何使用 goroutines。

我有下载图片的功能:

func imageDownloader(uri string, filename string) {
    fmt.Println("starting download for ", uri)

    outFile, err := os.Create(filename)
    defer outFile.Close()
    if err != nil {
        os.Exit(1)
    }

    client := &http.Client{}

    req, err := http.NewRequest("GET", uri, nil)

    resp, err := client.Do(req)
    defer resp.Body.Close()

    if err != nil {
        panic(err)
    }

    header := resp.ContentLength
    bar := pb.New(int(header))
    rd := bar.NewProxyReader(resp.Body)
    // and copy from reader
    io.Copy(outFile, rd)
}

当我作为另一个函数的一部分单独调用时,它会完整下载图像并且没有截断数据。

但是,当我尝试修改它以使其成为 goroutine 时,图像经常被截断或零长度文件。

func imageDownloader(uri string, filename string, wg *sync.WaitGroup) {
    ...
    io.Copy(outFile, rd)
    wg.Done()
}

func main() {
var wg sync.WaitGroup
wg.Add(1)
go imageDownloader(url, file, &wg)
wg.Wait()
}

我是否错误地使用了 WaitGroups?是什么原因导致的,我该如何解决?

更新:

解决了。我已将 wg.add() 函数放在循环之外。 :(

虽然我不确定是什么导致了您的问题,但这里有两个选项可以帮助您恢复正常工作。

首先,从同步库中查找 example of how to use waitgroups,尝试在函数的开头调用 defer wg.Done() 以确保即使 goroutine 意外结束,等待组也会正确递减。

其次,io.Copy returns 您没有检查的错误。无论如何,这不是很好的做法,但在您的特定情况下,它会阻止您查看复制例程中是否确实存在错误。检查并妥善处理。它还 returns 写入的字节数,这也可能对您有所帮助。

您的示例在使用 WaitGroups 方面没有任何明显的错误。只要您使用与启动的 goroutine 数量相同的数字调用 wg.Add(),或者每次启动新的 goroutine 时将其递增 1,那应该是正确的。

无论您在 goroutine 中为某些错误条件调用 os.Exitpanic,因此如果您有多个 运行,其中任何一个失败都会终止所有这些,不管是否使用 WaitGroups。如果它在没有恐慌消息的情况下失败,我会看一下 os.Exit(1) 行。

在函数开始时使用 defer wg.Done() 也是一个好习惯,这样即使发生错误,goroutine 仍会递减其计数器。这样,如果其中一个 goroutines returns 出错,您的主线程将不会挂起。

我想在你的例子中做的一个改变是在你 Done 时利用 defer。我认为这个 defer ws.Done() 应该是你函数中的第一条语句。

我喜欢WaitGroup的简洁。但是,我不喜欢我们需要将引用传递给 goroutine,因为这意味着并发逻辑将与您的业务逻辑混合。

所以我想出了这个通用函数来为我解决这个问题:

// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
    var waitGroup sync.WaitGroup
    waitGroup.Add(len(functions))

    defer waitGroup.Wait()

    for _, function := range functions {
        go func(copy func()) {
            defer waitGroup.Done()
            copy()
        }(function)
    }
}

所以你的例子可以这样解决:

func imageDownloader(uri string, filename string) {
    ...
    io.Copy(outFile, rd)
}

func main() {
    functions := []func(){}
    list := make([]Object, 5)
    for _, object := range list {
        function := func(obj Object){ 
            imageDownloader(object.uri, object.filename) 
        }(object)
        functions = append(functions, function)
    }

    Parallelize(functions...)        

    fmt.Println("Done")
}

如果你想使用它,你可以在这里找到它https://github.com/shomali11/util