使用 goroutines 下载文件?
downloading files with goroutines?
我是 Go 新手,正在学习如何使用 goroutines。
我有下载图片的功能:
func imageDownloader(uri string, filename string) {
fmt.Println("starting download for ", uri)
outFile, err := os.Create(filename)
defer outFile.Close()
if err != nil {
os.Exit(1)
}
client := &http.Client{}
req, err := http.NewRequest("GET", uri, nil)
resp, err := client.Do(req)
defer resp.Body.Close()
if err != nil {
panic(err)
}
header := resp.ContentLength
bar := pb.New(int(header))
rd := bar.NewProxyReader(resp.Body)
// and copy from reader
io.Copy(outFile, rd)
}
当我作为另一个函数的一部分单独调用时,它会完整下载图像并且没有截断数据。
但是,当我尝试修改它以使其成为 goroutine 时,图像经常被截断或零长度文件。
func imageDownloader(uri string, filename string, wg *sync.WaitGroup) {
...
io.Copy(outFile, rd)
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go imageDownloader(url, file, &wg)
wg.Wait()
}
我是否错误地使用了 WaitGroups?是什么原因导致的,我该如何解决?
更新:
解决了。我已将 wg.add()
函数放在循环之外。 :(
虽然我不确定是什么导致了您的问题,但这里有两个选项可以帮助您恢复正常工作。
首先,从同步库中查找 example of how to use waitgroups,尝试在函数的开头调用 defer wg.Done()
以确保即使 goroutine 意外结束,等待组也会正确递减。
其次,io.Copy
returns 您没有检查的错误。无论如何,这不是很好的做法,但在您的特定情况下,它会阻止您查看复制例程中是否确实存在错误。检查并妥善处理。它还 returns 写入的字节数,这也可能对您有所帮助。
您的示例在使用 WaitGroups 方面没有任何明显的错误。只要您使用与启动的 goroutine 数量相同的数字调用 wg.Add()
,或者每次启动新的 goroutine 时将其递增 1,那应该是正确的。
无论您在 goroutine 中为某些错误条件调用 os.Exit
和 panic
,因此如果您有多个 运行,其中任何一个失败都会终止所有这些,不管是否使用 WaitGroups。如果它在没有恐慌消息的情况下失败,我会看一下 os.Exit(1)
行。
在函数开始时使用 defer wg.Done()
也是一个好习惯,这样即使发生错误,goroutine 仍会递减其计数器。这样,如果其中一个 goroutines returns 出错,您的主线程将不会挂起。
我想在你的例子中做的一个改变是在你 Done
时利用 defer
。我认为这个 defer ws.Done()
应该是你函数中的第一条语句。
我喜欢WaitGroup
的简洁。但是,我不喜欢我们需要将引用传递给 goroutine,因为这意味着并发逻辑将与您的业务逻辑混合。
所以我想出了这个通用函数来为我解决这个问题:
// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
var waitGroup sync.WaitGroup
waitGroup.Add(len(functions))
defer waitGroup.Wait()
for _, function := range functions {
go func(copy func()) {
defer waitGroup.Done()
copy()
}(function)
}
}
所以你的例子可以这样解决:
func imageDownloader(uri string, filename string) {
...
io.Copy(outFile, rd)
}
func main() {
functions := []func(){}
list := make([]Object, 5)
for _, object := range list {
function := func(obj Object){
imageDownloader(object.uri, object.filename)
}(object)
functions = append(functions, function)
}
Parallelize(functions...)
fmt.Println("Done")
}
如果你想使用它,你可以在这里找到它https://github.com/shomali11/util
我是 Go 新手,正在学习如何使用 goroutines。
我有下载图片的功能:
func imageDownloader(uri string, filename string) {
fmt.Println("starting download for ", uri)
outFile, err := os.Create(filename)
defer outFile.Close()
if err != nil {
os.Exit(1)
}
client := &http.Client{}
req, err := http.NewRequest("GET", uri, nil)
resp, err := client.Do(req)
defer resp.Body.Close()
if err != nil {
panic(err)
}
header := resp.ContentLength
bar := pb.New(int(header))
rd := bar.NewProxyReader(resp.Body)
// and copy from reader
io.Copy(outFile, rd)
}
当我作为另一个函数的一部分单独调用时,它会完整下载图像并且没有截断数据。
但是,当我尝试修改它以使其成为 goroutine 时,图像经常被截断或零长度文件。
func imageDownloader(uri string, filename string, wg *sync.WaitGroup) {
...
io.Copy(outFile, rd)
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go imageDownloader(url, file, &wg)
wg.Wait()
}
我是否错误地使用了 WaitGroups?是什么原因导致的,我该如何解决?
更新:
解决了。我已将 wg.add()
函数放在循环之外。 :(
虽然我不确定是什么导致了您的问题,但这里有两个选项可以帮助您恢复正常工作。
首先,从同步库中查找 example of how to use waitgroups,尝试在函数的开头调用 defer wg.Done()
以确保即使 goroutine 意外结束,等待组也会正确递减。
其次,io.Copy
returns 您没有检查的错误。无论如何,这不是很好的做法,但在您的特定情况下,它会阻止您查看复制例程中是否确实存在错误。检查并妥善处理。它还 returns 写入的字节数,这也可能对您有所帮助。
您的示例在使用 WaitGroups 方面没有任何明显的错误。只要您使用与启动的 goroutine 数量相同的数字调用 wg.Add()
,或者每次启动新的 goroutine 时将其递增 1,那应该是正确的。
无论您在 goroutine 中为某些错误条件调用 os.Exit
和 panic
,因此如果您有多个 运行,其中任何一个失败都会终止所有这些,不管是否使用 WaitGroups。如果它在没有恐慌消息的情况下失败,我会看一下 os.Exit(1)
行。
在函数开始时使用 defer wg.Done()
也是一个好习惯,这样即使发生错误,goroutine 仍会递减其计数器。这样,如果其中一个 goroutines returns 出错,您的主线程将不会挂起。
我想在你的例子中做的一个改变是在你 Done
时利用 defer
。我认为这个 defer ws.Done()
应该是你函数中的第一条语句。
我喜欢WaitGroup
的简洁。但是,我不喜欢我们需要将引用传递给 goroutine,因为这意味着并发逻辑将与您的业务逻辑混合。
所以我想出了这个通用函数来为我解决这个问题:
// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
var waitGroup sync.WaitGroup
waitGroup.Add(len(functions))
defer waitGroup.Wait()
for _, function := range functions {
go func(copy func()) {
defer waitGroup.Done()
copy()
}(function)
}
}
所以你的例子可以这样解决:
func imageDownloader(uri string, filename string) {
...
io.Copy(outFile, rd)
}
func main() {
functions := []func(){}
list := make([]Object, 5)
for _, object := range list {
function := func(obj Object){
imageDownloader(object.uri, object.filename)
}(object)
functions = append(functions, function)
}
Parallelize(functions...)
fmt.Println("Done")
}
如果你想使用它,你可以在这里找到它https://github.com/shomali11/util