Go 中的简单工作池

Simple worker pool in Go

我正在尝试在 go 中实现一个简单的工作线程池,并将 运行 放入问题中。我只想让一定数量的工人完成一定数量的工作,然后再做更多的工作。我使用的代码类似于:

    jobs := make(chan imageMessage, 1)
    results := make(chan imageMessage, 1)

    for w := 0; w < 2; w++ {
        go worker(jobs, results)
    }

    for j := 0; j < len(images); j++ {
        jobs <- imageMessage{path: paths[j], img: images[j]}
    }
    close(jobs)

    for r := 0; r < len(images); r++ {
        <-results
    }
}

func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
    for j := range jobs {
        processImage(j.path, j.img)
        results <- j
    }
}

我的理解是,这应该创建 2 个工人,他们一次可以做 1 个 "thing",并且当他们完成那 1 个事情时,他们将继续得到更多的工作,直到没有其他事情可做。但是,我得到 fatal error: all goroutines are asleep - deadlock!

如果我将缓冲区设置为 100 之类的大值,这会起作用,但我希望能够限制一次完成的工作。

我觉得我很接近,但显然缺少一些东西。

问题是,一旦您成功发送 jobs 频道上的所有作业,您只启动 "draining" results 频道。但是为了能够发送所有作业,jobs 通道必须有足够大的缓冲区,或者 worker goroutines 必须能够从中使用作业。

但是一个 worker goroutines 在消费一个工作时,在它可以接受下一个工作之前,将结果发送到 results 通道。如果results通道缓冲区已满,发送结果会阻塞。

但是最后一部分——一个工作 goroutine 在发送结果时被阻塞——只能通过从 results 通道接收来 "unblocked"——直到你可以发送所有的工作你才知道.如果 jobs 通道和 results 通道的缓冲区无法容纳您的所有作业,则会出现死锁。这也解释了为什么如果将缓冲区大小增加到一个大值它会起作用:如果作业可以放入缓冲区,则不会发生死锁,并且在成功发送所有作业后,您的最终循环将耗尽 results频道。

解决办法? 运行 在自己的 goroutine 中生成和发送作业,因此您可以开始从 results 通道接收 "immediately" 而无需等待发送所有作业,这意味着 worker goroutines 不会尝试发送结果时被永远阻止:

go func() {
    for j := 0; j < len(images); j++ {
        jobs <- imageMessage{path: paths[j], img: images[j]}
    }
    close(jobs)
}()

Go Playground 上试用。

另请查看

中的类似实现

你可以使用这个工人。简单高效。 https://github.com/tamnguyenvt/go-worker

NewWorkerManager(WorkerManagerParams{
    WorkerSize: <number of workers>,
    RelaxAfter: <sleep for awhile to relax server after given duration>,
    RelaxDuration: <relax duration>,
    WorkerFunc: <your worker function here>,
    LogEnable: <enable log or not>,
    StopTimeout: <timeout all workers after given duration>,
}