Go队列处理失败重试
Go queue processing with retry on failure
我们有一堆文件要在处理后上传到远程 blob 存储。
目前,前端 (PHP) 创建了此类文件的 redis 列表,并为其提供了一个唯一 ID,称为 JobID。然后它将唯一 ID 传递给 beanstalk 管,由 Go 进程接收。它使用名为 Go workers 的库以 net/http
所做的方式处理每个作业 ID。它接收作业 ID,检索 redis 列表并开始处理文件。
但是,目前一次只能处理一个文件。由于这里的操作是 I/O 绑定的,而不是 CPU 绑定的,直觉表明每个文件使用一个 goroutine 是有益的。
但是,我们想在失败时重试上传,并跟踪每个作业处理的项目数。我们不能启动无限数量的 goroutines,因为单个作业可以包含大约 10k 个要处理的文件,并且在高峰时段每秒可以发送 100 个这样的作业。正确的做法是什么?
注意:如果需要,我们可以稍微更改技术堆栈(例如将 beanstalkd 换成某些东西)
您可以通过使用缓冲 chan
来限制 goroutine 的数量,其大小为您想要的 goroutine 的最大数量。如果达到最大容量,您可以阻止此 chan
。当您的 goroutines 完成时,它们将释放插槽以允许新的 goroutines 运行.
示例:
package main
import (
"fmt"
"sync"
)
var (
concurrent = 5
semaphoreChan = make(chan struct{}, concurrent)
)
func doWork(wg *sync.WaitGroup, item int) {
// block while full
semaphoreChan <- struct{}{}
go func() {
defer func() {
// read to release a slot
<-semaphoreChan
wg.Done()
}()
// This is where your work actually gets done
fmt.Println(item)
}()
}
func main() {
// we need this for the example so that we can block until all goroutines finish
var wg sync.WaitGroup
wg.Add(10)
// start the work
for i := 0; i < 10; i++ {
doWork(&wg, i)
}
// block until all work is done
wg.Wait()
}
去游乐场link:https://play.golang.org/p/jDMYuCe7HV
受此 Golang 英国会议演讲的启发:https://youtu.be/yeetIgNeIkc?t=1413
我们有一堆文件要在处理后上传到远程 blob 存储。
目前,前端 (PHP) 创建了此类文件的 redis 列表,并为其提供了一个唯一 ID,称为 JobID。然后它将唯一 ID 传递给 beanstalk 管,由 Go 进程接收。它使用名为 Go workers 的库以 net/http
所做的方式处理每个作业 ID。它接收作业 ID,检索 redis 列表并开始处理文件。
但是,目前一次只能处理一个文件。由于这里的操作是 I/O 绑定的,而不是 CPU 绑定的,直觉表明每个文件使用一个 goroutine 是有益的。
但是,我们想在失败时重试上传,并跟踪每个作业处理的项目数。我们不能启动无限数量的 goroutines,因为单个作业可以包含大约 10k 个要处理的文件,并且在高峰时段每秒可以发送 100 个这样的作业。正确的做法是什么?
注意:如果需要,我们可以稍微更改技术堆栈(例如将 beanstalkd 换成某些东西)
您可以通过使用缓冲 chan
来限制 goroutine 的数量,其大小为您想要的 goroutine 的最大数量。如果达到最大容量,您可以阻止此 chan
。当您的 goroutines 完成时,它们将释放插槽以允许新的 goroutines 运行.
示例:
package main
import (
"fmt"
"sync"
)
var (
concurrent = 5
semaphoreChan = make(chan struct{}, concurrent)
)
func doWork(wg *sync.WaitGroup, item int) {
// block while full
semaphoreChan <- struct{}{}
go func() {
defer func() {
// read to release a slot
<-semaphoreChan
wg.Done()
}()
// This is where your work actually gets done
fmt.Println(item)
}()
}
func main() {
// we need this for the example so that we can block until all goroutines finish
var wg sync.WaitGroup
wg.Add(10)
// start the work
for i := 0; i < 10; i++ {
doWork(&wg, i)
}
// block until all work is done
wg.Wait()
}
去游乐场link:https://play.golang.org/p/jDMYuCe7HV
受此 Golang 英国会议演讲的启发:https://youtu.be/yeetIgNeIkc?t=1413