如何在 Go 中执行并发下载
How to perform concurrent downloads in Go
我们有一个流程,用户可以通过该流程请求我们需要从我们的来源获取的文件。此来源不是最可靠的,因此我们使用 Amazon SQS 实施了一个队列。我们将下载 URL 放入队列中,然后使用我们用 Go 编写的小应用程序对其进行轮询。这个应用程序只是检索消息,下载文件,然后将其推送到我们存储它的 S3。一旦所有这些都完成,它会回调一个服务,该服务将通过电子邮件通知用户文件已准备就绪。
最初我写这个是为了创建 n 个通道,然后为每个通道附加 1 个 go-routine,并让 go-routine 处于无限循环中。这样我就可以确保我一次只处理固定数量的下载。
我意识到这不是应该使用频道的方式,如果我现在理解正确的话,实际上应该有一个频道 n 去- 在该频道上接收的例程。每个 go-routine 都处于无限循环中,等待一条消息,当它接收到数据时,它会处理数据,做它应该做的一切,当它完成后,它会等待下一条消息。这使我能够确保我一次只处理 n 个文件。我认为这是正确的做法。我相信这是 扇出,对吧?
我不需要做的是将这些进程重新合并在一起。下载完成后,它会回调一个远程服务,以便处理剩余的过程。该应用程序无需执行任何其他操作。
好的,所以一些代码:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// Do something with message m
// Delete message from queue when we're done
queue.DeleteMessage(&m)
}
}
我在附近吗?我有 n 运行 go-routines(其中 MAX_CONCURRENT_ROUTINES
= n)并且在循环中我们将继续传递消息进入单通道。这是正确的方法吗?我需要关闭任何东西还是可以无限期地离开这个 运行?
我注意到的一件事是 SQS 正在返回消息,但是一旦我将 10 条消息传递到 processMessage()
(10 是通道缓冲区的大小),实际上没有进一步的消息被处理.
谢谢大家
看起来不错。一些注意事项:
您可以通过限制生成的工作例程数量以外的方式来限制工作并行度。例如,您可以为收到的每条消息创建一个 goroutine,然后让生成的 goroutine 等待限制并行度的信号量。当然需要权衡取舍,但您不仅限于您所描述的方式。
sem := make(chan struct{}, n)
work := func(m sqs.Message) {
sem <- struct{}{} // When there's room we can proceed
// do the work
<-sem // Free room in the channel
}()
for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {
for _, m0 := range m {
go work(m0)
}
}
只能处理 10 条消息的限制是由您的堆栈中的其他地方引起的。可能您看到的是前 10 个填充频道的比赛,然后工作没有完成,或者您可能不小心 return 从工人例程中退出。如果您的员工按照您描述的模型坚持不懈,您需要确定他们不会 return.
尚不清楚您是否希望进程在处理完一定数量的消息后 return。如果你确实希望这个过程退出,你需要等待所有的工作人员完成他们当前的任务,然后可能会向 return 发出信号。查看 sync.WaitGroup
以同步它们的完成,并使用另一个通道来表示没有更多工作,或关闭 msgChannel
,并在您的工作人员中处理。 (看看 2 元组 return 通道接收表达式。)
我们有一个流程,用户可以通过该流程请求我们需要从我们的来源获取的文件。此来源不是最可靠的,因此我们使用 Amazon SQS 实施了一个队列。我们将下载 URL 放入队列中,然后使用我们用 Go 编写的小应用程序对其进行轮询。这个应用程序只是检索消息,下载文件,然后将其推送到我们存储它的 S3。一旦所有这些都完成,它会回调一个服务,该服务将通过电子邮件通知用户文件已准备就绪。
最初我写这个是为了创建 n 个通道,然后为每个通道附加 1 个 go-routine,并让 go-routine 处于无限循环中。这样我就可以确保我一次只处理固定数量的下载。
我意识到这不是应该使用频道的方式,如果我现在理解正确的话,实际上应该有一个频道 n 去- 在该频道上接收的例程。每个 go-routine 都处于无限循环中,等待一条消息,当它接收到数据时,它会处理数据,做它应该做的一切,当它完成后,它会等待下一条消息。这使我能够确保我一次只处理 n 个文件。我认为这是正确的做法。我相信这是 扇出,对吧?
我不需要做的是将这些进程重新合并在一起。下载完成后,它会回调一个远程服务,以便处理剩余的过程。该应用程序无需执行任何其他操作。
好的,所以一些代码:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// Do something with message m
// Delete message from queue when we're done
queue.DeleteMessage(&m)
}
}
我在附近吗?我有 n 运行 go-routines(其中 MAX_CONCURRENT_ROUTINES
= n)并且在循环中我们将继续传递消息进入单通道。这是正确的方法吗?我需要关闭任何东西还是可以无限期地离开这个 运行?
我注意到的一件事是 SQS 正在返回消息,但是一旦我将 10 条消息传递到 processMessage()
(10 是通道缓冲区的大小),实际上没有进一步的消息被处理.
谢谢大家
看起来不错。一些注意事项:
您可以通过限制生成的工作例程数量以外的方式来限制工作并行度。例如,您可以为收到的每条消息创建一个 goroutine,然后让生成的 goroutine 等待限制并行度的信号量。当然需要权衡取舍,但您不仅限于您所描述的方式。
sem := make(chan struct{}, n) work := func(m sqs.Message) { sem <- struct{}{} // When there's room we can proceed // do the work <-sem // Free room in the channel }() for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) { for _, m0 := range m { go work(m0) } }
只能处理 10 条消息的限制是由您的堆栈中的其他地方引起的。可能您看到的是前 10 个填充频道的比赛,然后工作没有完成,或者您可能不小心 return 从工人例程中退出。如果您的员工按照您描述的模型坚持不懈,您需要确定他们不会 return.
尚不清楚您是否希望进程在处理完一定数量的消息后 return。如果你确实希望这个过程退出,你需要等待所有的工作人员完成他们当前的任务,然后可能会向 return 发出信号。查看
sync.WaitGroup
以同步它们的完成,并使用另一个通道来表示没有更多工作,或关闭msgChannel
,并在您的工作人员中处理。 (看看 2 元组 return 通道接收表达式。)