Go队列处理失败重试

Go queue processing with retry on failure

我们有一堆文件要在处理后上传到远程 blob 存储。

目前,前端 (PHP) 创建了此类文件的 redis 列表,并为其提供了一个唯一 ID,称为 JobID。然后它将唯一 ID 传递给 beanstalk 管,由 Go 进程接收。它使用名为 Go workers 的库以 net/http 所做的方式处理每个作业 ID。它接收作业 ID,检索 redis 列表并开始处理文件。

但是,目前一次只能处理一个文件。由于这里的操作是 I/O 绑定的,而不是 CPU 绑定的,直觉表明每个文件使用一个 goroutine 是有益的。

但是,我们想在失败时重试上传,并跟踪每个作业处理的项目数。我们不能启动无限数量的 goroutines,因为单个作业可以包含大约 10k 个要处理的文件,并且在高峰时段每秒可以发送 100 个这样的作业。正确的做法是什么?

注意:如果需要,我们可以稍微更改技术堆栈(例如将 beanstalkd 换成某些东西)

您可以通过使用缓冲 chan 来限制 goroutine 的数量,其大小为您想要的 goroutine 的最大数量。如果达到最大容量,您可以阻止此 chan。当您的 goroutines 完成时,它们将释放插槽以允许新的 goroutines 运行.

示例:

package main

import (
    "fmt"
    "sync"
)

var (
    concurrent    = 5
    semaphoreChan = make(chan struct{}, concurrent)
)

func doWork(wg *sync.WaitGroup, item int) {
    // block while full
    semaphoreChan <- struct{}{}

    go func() {
        defer func() {
            // read to release a slot
            <-semaphoreChan
            wg.Done()
        }()
        // This is where your work actually gets done
        fmt.Println(item)
    }()
}

func main() {
    // we need this for the example so that we can block until all goroutines finish
    var wg sync.WaitGroup
    wg.Add(10)

    // start the work
    for i := 0; i < 10; i++ {
        doWork(&wg, i)
    }

    // block until all work is done
    wg.Wait()
}

去游乐场link:https://play.golang.org/p/jDMYuCe7HV

受此 Golang 英国会议演讲的启发:https://youtu.be/yeetIgNeIkc?t=1413