通过错误组处理 goroutine 终止和错误处理?

Handle goroutine termination and error handling via error group?

我正在尝试以这种方式并行读取多个文件,以便每个正在读取文件的 go 例程将其数据写入该通道,然后有一个 go-routine 监听该通道并添加数据到地图。这是我的 play.

下面是剧中的例子:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var myFiles = []string{"file1", "file2", "file3"}
    var myMap = make(map[string][]byte)
    dataChan := make(chan fileData, len(myFiles))
    wg := sync.WaitGroup{}
    defer close(dataChan)
    // we create a wait group of N
    wg.Add(len(myFiles))
    for _, file := range myFiles {
        // we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
        // the file via the dataChan channel
        go getBytesFromFile(file, dataChan, &wg)
    }
    // we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
    wg.Wait()
    for i := 0; i < len(myFiles); i++ {
        // we can now read from the data channel N times.
        file := <-dataChan
        myMap[file.name] = file.bytes
    }
    fmt.Printf("%+v\n", myMap)
}

type fileData struct {
    name  string
    bytes []byte
}

// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
    bytes := openFileAndGetBytes(file)
    dataChan <- fileData{name: file, bytes: bytes}
    waitGroup.Done()
}

func openFileAndGetBytes(file string) []byte {
    return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}

问题陈述

如何使用 golang.org/x/sync/errgroup 来等待和处理来自 goroutine 的错误,或者是否有更好的方法,比如使用信号量?例如,如果我的任何一个 go 例程无法从文件中读取数据,那么我想取消在任何一个例程返回错误的情况下剩余的所有那些(在这种情况下,该错误是返回给调用者的一个气泡)。对于成功案例,它应该自动等待所有提供的 go 例程成功完成。

如果文件总数为 100,我也不想生成 100 个 go-routines。如果有任何方法,我想尽可能地控制并行度。

How can I use golang.org/x/sync/errgroup to wait on and handle errors from goroutines or if there is any better way like using semaphore? For example [...] I want to cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller). And it should automatically waits for all the supplied go routines to complete successfully for success case.

有很多方法可以在 goroutine 之间传递错误状态。 errgroup 虽然做了很多繁重的工作,但适合这种情况。否则你最终会执行同样的事情。

要使用 errgroup,我们需要处理错误(并为您的演示生成一些错误)。此外,要取消现有的 goroutine,我们将使用来自 errgroup.NewWithContext.

的上下文

来自错误组参考,

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

您的游戏不支持任何错误处理。如果我们不做任何错误处理,我们就不能收集和取消错误。所以我添加了一些代码来注入错误处理:

func openFileAndGetBytes(file string) (string, error) {
    if file == "file2" {
        return "", fmt.Errorf("%s cannot be read", file)
    }
    return fmt.Sprintf("these are some bytes for file %s", file), nil
}

然后该错误也必须从 getBytesFromFile 返回:

func getBytesFromFile(file string, dataChan chan fileData) error {
    bytes, err := openFileAndGetBytes(file)
    if err == nil {
        dataChan <- fileData{name: file, bytes: bytes}
    }
    return err
}

现在我们已经完成了,我们可以将注意力转移到我们将如何启动一些 goroutine 上。

I also don't want to spawn 100 go-routines if total number of files is 100. I want to control the parallelism if possible if there is any way.

写得好,任务数、通道大小和工人数通常是独立的值。诀窍是使用通道关闭 - 在您的情况下,上下文取消 - 在 goroutine 之间传递状态。我们需要一个额外的文件名分发通道,以及一个额外的 goroutine 来收集结果。

为了说明这一点,我的代码使用了 3 个工人,并添加了几个文件。我的频道是无缓冲的。这使我们可以看到一些文件得到处理,而另一些文件被中止。如果您缓冲通道,该示例仍然有效,但更有可能在处理取消之前处理额外的工作。试验缓冲区大小以及工作人员数量和要处理的文件数量。

    var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
    fileChan := make(chan string)
    dataChan := make(chan fileData)

为了启动 worker,我们不是为每个文件启动一个,而是启动我们想要的数字 - 这里是 3。

    for i := 0; i < 3; i++ {
        worker_num := i
        g.Go(func() error {
            for file := range fileChan {
                if err := getBytesFromFile(file, dataChan); err != nil {
                    fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    fmt.Println("worker", worker_num, "context error in worker:", err.Error())
                    return err
                }
            }
            fmt.Println("worker", worker_num, "processed all work on channel")
            return nil

        })
    }

工作人员调用您的 getBytesFromFile 函数。如果它 return 是一个错误,我们 return 一个错误。在这种情况下,errgroup 将自动取消我们的上下文。但是,操作的确切顺序是不确定的,因此在取消上下文之前可能会或可能不会处理更多文件。我将在下面展示几种可能性。

通过 rangefileChan 上,工作人员自动从通道关闭中获取工作结束。如果我们得到一个错误,我们可以立即 return 到 errgroup。否则,如果上下文已被取消,我们可以return立即取消错误。

您可能认为 g.Go 会自动取消我们的功能。但它不能。除了进程终止之外,无法取消 Go 中的 运行ning 函数。 errgroup.Group.Go 的函数参数必须根据其上下文的状态在适当的时候自行取消。

现在我们可以将注意力转移到将文件放在 fileChan 上的那个东西上了。我们在这里有 2 个选项:我们可以像您一样使用大小为 myFiles 的缓冲通道。我们可以用待处理的作业填充整个通道。只有当您在创建频道时知道作业数量时,这才是一个选项。另一种选择是使用一个额外的“分发”goroutine,它可以阻止对 fileChan 的写入,以便我们的“主”goroutine 可以继续。

    // dispatch files
    g.Go(func() error {
        defer close(fileChan)
        done := ctx.Done()
        for _, file := range myFiles {
            select {
            case fileChan <- file:
                continue
            case <-done:
                break
            }
        }
        return ctx.Err()
    })

我不确定在这种情况下是否有必要将它放在同一个错误组中,因为我们无法在分发器 goroutine 中获取错误。但是这种从 Pipeline example from errgroup 中提取的一般模式无论工作调度程序是否会产生错误都有效。

这个功能非常简单,但神奇之处在于 selectctx.Done() 频道。我们要么写入工作通道,要么 如果我们的上下文完成,我们就会失败。这允许我们在一个工作人员失败一个文件时停止分发工作。

我们 defer close(fileChan) 这样,无论我们为什么完成(我们分配了所有工作,或者上下文被取消),工作人员都知道传入的工作队列中不会再有工作(即fileChan).

我们还需要一种同步机制:一旦所有的工作都被分发,并且所有的结果都在或工作完成被取消,(例如,在我们的错误组的Wait() returns之后) ,我们需要关闭我们的结果通道,dataChan。这向结果收集器发出信号,表明没有更多结果要收集。

    var err error // we'll need this later!
    go func() {
        err = g.Wait()
        close(dataChan)
    }()

我们不能 - 也不需要 - 将它放在 errgroup.Group 中。函数不能 return 出错,也不能等待自己到 close(dataChan)。所以它进入一个常规的旧 goroutine,sans errgroup.

我们终于可以收集结果了。有了专用的 worker goroutines,一个 distributor goroutine 和一个 goroutine 等待工作并通知将不再写入 dataChan,我们可以在 [=45] 的“主要”goroutine 中收集所有结果=].

    for data := range dataChan {
        myMap[data.name] = data.bytes
    }
    if err != nil { // this was set in our final goroutine, remember
        fmt.Println("errgroup Error:", err.Error())
    }

我做了一些小改动,以便更容易看到输出。您可能已经注意到我将文件内容从 []byte 更改为 string。这纯粹是为了让结果易于阅读。也为此目的,我使用 encoding/json 格式化结果,以便于阅读并将它们粘贴到 SO 中。这是我经常用来缩进结构化数据的常见模式:

    enc := json.NewEncoder(os.Stdout)
    enc.SetIndent("", " ")
    if err := enc.Encode(myMap); err != nil {
        panic(err)
    }

我们终于准备好了 运行。现在我们可以看到许多不同的结果,具体取决于 goroutines 执行的顺序。但它们都是有效的执行路径。

worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
 "file1": "these are some bytes for file file1",
 "file3": "these are some bytes for file file3"
}

Program exited.

在此结果中,剩余的工作(file4file5)未添加到频道。请记住,无缓冲通道不存储任何数据。对于要写入通道的任务,工作人员必须在那里阅读它们。相反,上下文在 file2 失败后被取消,分布函数遵循其 select 中的 <-done 路径。 file1file3 已处理。

这是一个不同的结果(我只是 运行 游乐场分享了几次以获得不同的结果)。

worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
 "file1": "these are some bytes for file file1",
 "file3": "these are some bytes for file file3",
 "file4": "these are some bytes for file file4",
 "file5": "these are some bytes for file file5",
 "file6": "these are some bytes for file file6"
}

在这种情况下,我们的取消看起来有点像失败了。但真正发生的是,goroutines 只是“碰巧”排队并在 errorgroup 发现 worker 失败并取消上下文之前完成剩余的工作。

错误组的作用

当您使用错误组时,您实际上从中得到了两件事:

  • 轻松访问您的工作人员的第一个错误 returned;
  • 获取 errorgroup 将在
  • 时为您取消的上下文

请记住,errorgroup 不会 取消 goroutine。起初这让我有点不高兴。错误组取消上下文。您有责任将该上下文的状态应用到您的 goroutines(请记住,goroutine 必须自行结束,errorgroup 不能结束它)。

关于文件操作上下文和未完成工作的最后一个旁白

您的大部分文件操作,例如io.Copyos.ReadFile,实际上是后续Read 操作的循环。但是 ioos 不直接支持上下文。所以如果你有一个工作人员正在读取一个文件,而你自己没有实现 Read 循环,你将没有机会根据上下文取消。在你的情况下这可能没问题 - 当然,你可能已经阅读了比你真正需要的更多的文件,但这只是因为你已经在错误发生时阅读了它们。我个人会接受这种情况,不会实现我自己的读取循环。

密码

https://go.dev/play/p/9qfESp_eB-C

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "os"

    "golang.org/x/sync/errgroup"
)

func main() {
    var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
    fileChan := make(chan string)
    dataChan := make(chan fileData)
    g, ctx := errgroup.WithContext(context.Background())
    for i := 0; i < 3; i++ {
        worker_num := i
        g.Go(func() error {
            for file := range fileChan {
                if err := getBytesFromFile(file, dataChan); err != nil {
                    fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
                    return err
                } else if err := ctx.Err(); err != nil {
                    fmt.Println("worker", worker_num, "context error in worker:", err.Error())
                    return err
                }
            }
            fmt.Println("worker", worker_num, "processed all work on channel")
            return nil

        })
    }
    // dispatch files
    g.Go(func() error {
        defer close(fileChan)
        done := ctx.Done()
        for _, file := range myFiles {
            if err := ctx.Err(); err != nil {
                return err
            }
            select {
            case fileChan <- file:
                continue
            case <-done:
                break
            }
        }
        return ctx.Err()
    })
    var err error
    go func() {
        err = g.Wait()
        close(dataChan)
    }()
    var myMap = make(map[string]string)

    for data := range dataChan {
        myMap[data.name] = data.bytes
    }
    if err != nil {
        fmt.Println("errgroup Error:", err.Error())
    }
    enc := json.NewEncoder(os.Stdout)
    enc.SetIndent("", " ")
    if err := enc.Encode(myMap); err != nil {
        panic(err)
    }
}

type fileData struct {
    name,
    bytes string
}

func getBytesFromFile(file string, dataChan chan fileData) error {
    bytes, err := openFileAndGetBytes(file)
    if err == nil {
        dataChan <- fileData{name: file, bytes: bytes}
    }
    return err
}

func openFileAndGetBytes(file string) (string, error) {
    if file == "file2" {
        return "", fmt.Errorf("%s cannot be read", file)
    }
    return fmt.Sprintf("these are some bytes for file %s", file), nil
}