Go 中的简单工作池
Simple worker pool in Go
我正在尝试在 go 中实现一个简单的工作线程池,并将 运行 放入问题中。我只想让一定数量的工人完成一定数量的工作,然后再做更多的工作。我使用的代码类似于:
jobs := make(chan imageMessage, 1)
results := make(chan imageMessage, 1)
for w := 0; w < 2; w++ {
go worker(jobs, results)
}
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
for r := 0; r < len(images); r++ {
<-results
}
}
func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
for j := range jobs {
processImage(j.path, j.img)
results <- j
}
}
我的理解是,这应该创建 2 个工人,他们一次可以做 1 个 "thing",并且当他们完成那 1 个事情时,他们将继续得到更多的工作,直到没有其他事情可做。但是,我得到 fatal error: all goroutines are asleep - deadlock!
如果我将缓冲区设置为 100 之类的大值,这会起作用,但我希望能够限制一次完成的工作。
我觉得我很接近,但显然缺少一些东西。
问题是,一旦您成功发送 jobs
频道上的所有作业,您只启动 "draining" results
频道。但是为了能够发送所有作业,jobs
通道必须有足够大的缓冲区,或者 worker goroutines 必须能够从中使用作业。
但是一个 worker goroutines 在消费一个工作时,在它可以接受下一个工作之前,将结果发送到 results
通道。如果results
通道缓冲区已满,发送结果会阻塞。
但是最后一部分——一个工作 goroutine 在发送结果时被阻塞——只能通过从 results
通道接收来 "unblocked"——直到你可以发送所有的工作你才知道.如果 jobs
通道和 results
通道的缓冲区无法容纳您的所有作业,则会出现死锁。这也解释了为什么如果将缓冲区大小增加到一个大值它会起作用:如果作业可以放入缓冲区,则不会发生死锁,并且在成功发送所有作业后,您的最终循环将耗尽 results
频道。
解决办法? 运行 在自己的 goroutine 中生成和发送作业,因此您可以开始从 results
通道接收 "immediately" 而无需等待发送所有作业,这意味着 worker goroutines 不会尝试发送结果时被永远阻止:
go func() {
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
}()
在 Go Playground 上试用。
另请查看
中的类似实现
你可以使用这个工人。简单高效。 https://github.com/tamnguyenvt/go-worker
NewWorkerManager(WorkerManagerParams{
WorkerSize: <number of workers>,
RelaxAfter: <sleep for awhile to relax server after given duration>,
RelaxDuration: <relax duration>,
WorkerFunc: <your worker function here>,
LogEnable: <enable log or not>,
StopTimeout: <timeout all workers after given duration>,
}
我正在尝试在 go 中实现一个简单的工作线程池,并将 运行 放入问题中。我只想让一定数量的工人完成一定数量的工作,然后再做更多的工作。我使用的代码类似于:
jobs := make(chan imageMessage, 1)
results := make(chan imageMessage, 1)
for w := 0; w < 2; w++ {
go worker(jobs, results)
}
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
for r := 0; r < len(images); r++ {
<-results
}
}
func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
for j := range jobs {
processImage(j.path, j.img)
results <- j
}
}
我的理解是,这应该创建 2 个工人,他们一次可以做 1 个 "thing",并且当他们完成那 1 个事情时,他们将继续得到更多的工作,直到没有其他事情可做。但是,我得到 fatal error: all goroutines are asleep - deadlock!
如果我将缓冲区设置为 100 之类的大值,这会起作用,但我希望能够限制一次完成的工作。
我觉得我很接近,但显然缺少一些东西。
问题是,一旦您成功发送 jobs
频道上的所有作业,您只启动 "draining" results
频道。但是为了能够发送所有作业,jobs
通道必须有足够大的缓冲区,或者 worker goroutines 必须能够从中使用作业。
但是一个 worker goroutines 在消费一个工作时,在它可以接受下一个工作之前,将结果发送到 results
通道。如果results
通道缓冲区已满,发送结果会阻塞。
但是最后一部分——一个工作 goroutine 在发送结果时被阻塞——只能通过从 results
通道接收来 "unblocked"——直到你可以发送所有的工作你才知道.如果 jobs
通道和 results
通道的缓冲区无法容纳您的所有作业,则会出现死锁。这也解释了为什么如果将缓冲区大小增加到一个大值它会起作用:如果作业可以放入缓冲区,则不会发生死锁,并且在成功发送所有作业后,您的最终循环将耗尽 results
频道。
解决办法? 运行 在自己的 goroutine 中生成和发送作业,因此您可以开始从 results
通道接收 "immediately" 而无需等待发送所有作业,这意味着 worker goroutines 不会尝试发送结果时被永远阻止:
go func() {
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
}()
在 Go Playground 上试用。
另请查看
你可以使用这个工人。简单高效。 https://github.com/tamnguyenvt/go-worker
NewWorkerManager(WorkerManagerParams{
WorkerSize: <number of workers>,
RelaxAfter: <sleep for awhile to relax server after given duration>,
RelaxDuration: <relax duration>,
WorkerFunc: <your worker function here>,
LogEnable: <enable log or not>,
StopTimeout: <timeout all workers after given duration>,
}