这是 Go 中惯用的工作线程池吗?
Is this an idiomatic worker thread pool in Go?
我正在尝试使用 goroutines 编写一个简单的工作池。
- 我写的代码是地道的吗?如果不是,那应该改变什么?
- 我希望能够将工作线程的最大数量设置为 5,并在所有 5 个都忙时阻塞直到有一个工作线程可用。我如何将其扩展到最多只有 5 个工人?我是否生成静态 5 goroutines,并给每个 goroutines
work_channel
?
代码:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
func main() {
var work_channel = make(chan string)
var results_channel = make(chan string)
// create goroutine per item in work_channel
go func() {
var c = 0
var wg sync.WaitGroup
for work := range work_channel {
wg.Add(1)
go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
c++
}
wg.Wait()
fmt.Println("closing results channel")
close(results_channel)
}()
// add work to the work_channel
go func() {
for c := 'a'; c < 'z'; c++ {
work_channel <- fmt.Sprintf("%c", c)
}
close(work_channel)
fmt.Println("sent work to work_channel")
}()
for x := range results_channel {
fmt.Printf("result: %s\n", x)
}
}
你可以实现一个计数信号量来限制 goroutine 并发。
var tokens = make(chan struct{}, 20)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
tokens <- struct{}{} // acquire a token before performing work
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
<-tokens // release the token
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
这是一般用来限制worker数量的设计。您当然可以更改 releasing/acquiring 标记的位置以适合您的代码。
您的解决方案在任何意义上都不是工作协程池:您的代码不限制并发协程,也不 "reuse" 协程(它总是在收到新作业时启动一个新协程)。
生产者消费者模式
发布于 Bruteforce MD5 Password cracker, you can make use of the producer-consumer pattern。你可以有一个指定的 producer goroutine 来生成作业(要做的事情/计算),并将它们发送到 jobs 频道。您可以有一个固定的 consumer goroutines 池(例如其中的 5 个),它们将在交付作业的通道上循环,并且每个 goroutines 将执行/完成接收到的作业。
producer goroutine 可以在所有作业生成并发送后简单地关闭 jobs
通道,正确地向 consumers 发出信号不会再有工作了。通道上的 for ... range
构造处理 "close" 事件并正确终止。请注意,关闭频道之前发送的所有作业仍将被传送。
这将导致一个干净的设计,将导致固定(但任意)数量的 goroutines,并且它将始终利用 100% CPU(如果 goroutines 的数量大于 [=173 的数量=] 核心)。它还具有一个优点,即可以正确选择通道容量(缓冲通道)和消费者的数量 goroutines。
,它可以为"throttled"。
请注意,具有指定生产者 goroutine 的此模型不是强制性的。您也可以有多个 goroutines 来生成作业,但是您也必须同步它们以仅在所有生产者 goroutine 完成生成作业时关闭 jobs
通道 - 否则尝试在 jobs
通道上发送另一个作业当它已经关闭时会导致运行时恐慌。通常生产作业的成本很低,并且生产速度比执行速度快得多,因此这种在 1 个 goroutine 中生产作业同时许多人正在使用/执行它们的模型在实践中是很好的。
处理结果:
如果作业有结果,您可以选择指定 结果 渠道来交付结果 ("sent back"),或者您可以选择处理当工作完成/完成时,结果会出现在消费者中。后者甚至可以通过具有处理结果的 "callback" 函数来实现。重要的是结果是可以独立处理还是需要合并(例如 map-reduce 框架)或聚合。
如果你使用 results
通道,你还需要一个 goroutine 从它接收值,防止消费者被阻塞(如果 results
的缓冲区被填满就会发生)。
有results
频道
我不会将简单的 string
值作为作业和结果发送,而是创建一个可以包含任何附加信息的包装器类型,因此它更加灵活:
type Job struct {
Id int
Work string
Result string
}
注意 Job
结构也包装了结果,所以当我们发回结果时,它也包含原始的 Job
作为上下文 - 通常非常有用。另请注意,仅在通道上发送指针 (*Job
) 而不是 Job
值是有利可图的,因此无需制作 Job
的 "countless" 副本,以及Job
结构值的大小变得无关紧要。
下面是这个生产者-消费者的样子:
我会使用 2 个 sync.WaitGroup
值,它们的作用如下:
var wg, wg2 sync.WaitGroup
生产者负责生成要执行的作业:
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
完成后(没有更多的工作),jobs
通道关闭,这向消费者发出信号,表明不会有更多的工作到达。
请注意,produce()
将 jobs
通道视为 仅发送 ,因为制作者只需要这样做:send 作业(除了 closing 它,但在 send only 频道上也是允许的)。生产者中的意外接收将是编译时错误(在编译时及早检测到)。
消费者的责任是只要能接收到任务就接收任务,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
请注意 consume()
将 jobs
频道视为 仅接收 ;消费者只需要从中接收。类似地,results
渠道是 仅向消费者发送 。
另请注意,results
通道不能在这里关闭,因为有多个消费者goroutines,只有第一次尝试关闭它会成功,更多的会成功导致运行时恐慌! results
通道可以(必须)在所有消费者 goroutine 结束后关闭,因为这样我们就可以确定不会在 results
通道上发送更多值(结果)。
我们有需要分析的结果:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s\n", job.Result)
}
}
如您所见,只要结果可能出现,它也会收到结果(直到 results
通道关闭)。分析器的 results
通道是 仅接收 。
请注意通道类型的使用:只要足够,仅使用 单向 通道类型以在编译时尽早检测和防止错误。如果确实需要双向,请仅使用 双向 通道类型。
这就是所有这些粘合在一起的方式:
func main() {
jobs := make(chan *Job, 100) // Buffered channel
results := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs, results)
}
// Start producing
go produce(jobs)
// Start analyzing:
wg2.Add(1)
go analyze(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait() // Wait analyzer to analyze all results
}
示例输出:
这是一个示例输出:
如您所见,结果即将到来并在所有作业入队之前进行分析:
worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms
在 Go Playground 上试用完整的应用程序。
没有results
频道
如果我们不使用 results
通道但消费者 goroutine 会立即处理结果(在我们的例子中打印它),代码会大大简化。在这种情况下,我们不需要 2 个 sync.WaitGroup
值(第二个只需要等待分析器完成)。
没有 results
频道,完整的解决方案是这样的:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs)
}
// Start producing
go produce(jobs)
wg.Wait() // Wait all consumers to finish processing jobs
}
输出是 "like" 通道 results
的输出(当然 execution/completion 顺序是随机的)。
在 Go Playground 上试试这个变体。
我正在尝试使用 goroutines 编写一个简单的工作池。
- 我写的代码是地道的吗?如果不是,那应该改变什么?
- 我希望能够将工作线程的最大数量设置为 5,并在所有 5 个都忙时阻塞直到有一个工作线程可用。我如何将其扩展到最多只有 5 个工人?我是否生成静态 5 goroutines,并给每个 goroutines
work_channel
?
代码:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
func main() {
var work_channel = make(chan string)
var results_channel = make(chan string)
// create goroutine per item in work_channel
go func() {
var c = 0
var wg sync.WaitGroup
for work := range work_channel {
wg.Add(1)
go worker(fmt.Sprintf("%d", c), work, results_channel, &wg)
c++
}
wg.Wait()
fmt.Println("closing results channel")
close(results_channel)
}()
// add work to the work_channel
go func() {
for c := 'a'; c < 'z'; c++ {
work_channel <- fmt.Sprintf("%c", c)
}
close(work_channel)
fmt.Println("sent work to work_channel")
}()
for x := range results_channel {
fmt.Printf("result: %s\n", x)
}
}
你可以实现一个计数信号量来限制 goroutine 并发。
var tokens = make(chan struct{}, 20)
func worker(id string, work string, o chan string, wg *sync.WaitGroup) {
defer wg.Done()
tokens <- struct{}{} // acquire a token before performing work
sleepMs := rand.Intn(1000)
fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
<-tokens // release the token
o <- work + fmt.Sprintf("-%dms", sleepMs)
}
这是一般用来限制worker数量的设计。您当然可以更改 releasing/acquiring 标记的位置以适合您的代码。
您的解决方案在任何意义上都不是工作协程池:您的代码不限制并发协程,也不 "reuse" 协程(它总是在收到新作业时启动一个新协程)。
生产者消费者模式
发布于 Bruteforce MD5 Password cracker, you can make use of the producer-consumer pattern。你可以有一个指定的 producer goroutine 来生成作业(要做的事情/计算),并将它们发送到 jobs 频道。您可以有一个固定的 consumer goroutines 池(例如其中的 5 个),它们将在交付作业的通道上循环,并且每个 goroutines 将执行/完成接收到的作业。
producer goroutine 可以在所有作业生成并发送后简单地关闭 jobs
通道,正确地向 consumers 发出信号不会再有工作了。通道上的 for ... range
构造处理 "close" 事件并正确终止。请注意,关闭频道之前发送的所有作业仍将被传送。
这将导致一个干净的设计,将导致固定(但任意)数量的 goroutines,并且它将始终利用 100% CPU(如果 goroutines 的数量大于 [=173 的数量=] 核心)。它还具有一个优点,即可以正确选择通道容量(缓冲通道)和消费者的数量 goroutines。
,它可以为"throttled"。请注意,具有指定生产者 goroutine 的此模型不是强制性的。您也可以有多个 goroutines 来生成作业,但是您也必须同步它们以仅在所有生产者 goroutine 完成生成作业时关闭 jobs
通道 - 否则尝试在 jobs
通道上发送另一个作业当它已经关闭时会导致运行时恐慌。通常生产作业的成本很低,并且生产速度比执行速度快得多,因此这种在 1 个 goroutine 中生产作业同时许多人正在使用/执行它们的模型在实践中是很好的。
处理结果:
如果作业有结果,您可以选择指定 结果 渠道来交付结果 ("sent back"),或者您可以选择处理当工作完成/完成时,结果会出现在消费者中。后者甚至可以通过具有处理结果的 "callback" 函数来实现。重要的是结果是可以独立处理还是需要合并(例如 map-reduce 框架)或聚合。
如果你使用 results
通道,你还需要一个 goroutine 从它接收值,防止消费者被阻塞(如果 results
的缓冲区被填满就会发生)。
有results
频道
我不会将简单的 string
值作为作业和结果发送,而是创建一个可以包含任何附加信息的包装器类型,因此它更加灵活:
type Job struct {
Id int
Work string
Result string
}
注意 Job
结构也包装了结果,所以当我们发回结果时,它也包含原始的 Job
作为上下文 - 通常非常有用。另请注意,仅在通道上发送指针 (*Job
) 而不是 Job
值是有利可图的,因此无需制作 Job
的 "countless" 副本,以及Job
结构值的大小变得无关紧要。
下面是这个生产者-消费者的样子:
我会使用 2 个 sync.WaitGroup
值,它们的作用如下:
var wg, wg2 sync.WaitGroup
生产者负责生成要执行的作业:
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
完成后(没有更多的工作),jobs
通道关闭,这向消费者发出信号,表明不会有更多的工作到达。
请注意,produce()
将 jobs
通道视为 仅发送 ,因为制作者只需要这样做:send 作业(除了 closing 它,但在 send only 频道上也是允许的)。生产者中的意外接收将是编译时错误(在编译时及早检测到)。
消费者的责任是只要能接收到任务就接收任务,并执行它们:
func consume(id int, jobs <-chan *Job, results chan<- *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs)
results <- job
}
}
请注意 consume()
将 jobs
频道视为 仅接收 ;消费者只需要从中接收。类似地,results
渠道是 仅向消费者发送 。
另请注意,results
通道不能在这里关闭,因为有多个消费者goroutines,只有第一次尝试关闭它会成功,更多的会成功导致运行时恐慌! results
通道可以(必须)在所有消费者 goroutine 结束后关闭,因为这样我们就可以确定不会在 results
通道上发送更多值(结果)。
我们有需要分析的结果:
func analyze(results <-chan *Job) {
defer wg2.Done()
for job := range results {
fmt.Printf("result: %s\n", job.Result)
}
}
如您所见,只要结果可能出现,它也会收到结果(直到 results
通道关闭)。分析器的 results
通道是 仅接收 。
请注意通道类型的使用:只要足够,仅使用 单向 通道类型以在编译时尽早检测和防止错误。如果确实需要双向,请仅使用 双向 通道类型。
这就是所有这些粘合在一起的方式:
func main() {
jobs := make(chan *Job, 100) // Buffered channel
results := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs, results)
}
// Start producing
go produce(jobs)
// Start analyzing:
wg2.Add(1)
go analyze(results)
wg.Wait() // Wait all consumers to finish processing jobs
// All jobs are processed, no more values will be sent on results:
close(results)
wg2.Wait() // Wait analyzer to analyze all results
}
示例输出:
这是一个示例输出:
如您所见,结果即将到来并在所有作业入队之前进行分析:
worker #4 received: 'e', sleep 81ms
worker #0 received: 'a', sleep 887ms
worker #1 received: 'b', sleep 847ms
worker #2 received: 'c', sleep 59ms
worker #3 received: 'd', sleep 81ms
worker #2 received: 'f', sleep 318ms
result: c-59ms
worker #4 received: 'g', sleep 425ms
result: e-81ms
worker #3 received: 'h', sleep 540ms
result: d-81ms
worker #2 received: 'i', sleep 456ms
result: f-318ms
worker #4 received: 'j', sleep 300ms
result: g-425ms
worker #3 received: 'k', sleep 694ms
result: h-540ms
worker #4 received: 'l', sleep 511ms
result: j-300ms
worker #2 received: 'm', sleep 162ms
result: i-456ms
worker #1 received: 'n', sleep 89ms
result: b-847ms
worker #0 received: 'o', sleep 728ms
result: a-887ms
worker #1 received: 'p', sleep 274ms
result: n-89ms
worker #2 received: 'q', sleep 211ms
result: m-162ms
worker #2 received: 'r', sleep 445ms
result: q-211ms
worker #1 received: 's', sleep 237ms
result: p-274ms
worker #3 received: 't', sleep 106ms
result: k-694ms
worker #4 received: 'u', sleep 495ms
result: l-511ms
worker #3 received: 'v', sleep 466ms
result: t-106ms
worker #1 received: 'w', sleep 528ms
result: s-237ms
worker #0 received: 'x', sleep 258ms
result: o-728ms
worker #2 received: 'y', sleep 47ms
result: r-445ms
worker #2 received: 'z', sleep 947ms
result: y-47ms
result: u-495ms
result: x-258ms
result: v-466ms
result: w-528ms
result: z-947ms
在 Go Playground 上试用完整的应用程序。
没有results
频道
如果我们不使用 results
通道但消费者 goroutine 会立即处理结果(在我们的例子中打印它),代码会大大简化。在这种情况下,我们不需要 2 个 sync.WaitGroup
值(第二个只需要等待分析器完成)。
没有 results
频道,完整的解决方案是这样的:
var wg sync.WaitGroup
type Job struct {
Id int
Work string
}
func produce(jobs chan<- *Job) {
// Generate jobs:
id := 0
for c := 'a'; c <= 'z'; c++ {
id++
jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)}
}
close(jobs)
}
func consume(id int, jobs <-chan *Job) {
defer wg.Done()
for job := range jobs {
sleepMs := rand.Intn(1000)
fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs))
}
}
func main() {
jobs := make(chan *Job, 100) // Buffered channel
// Start consumers:
for i := 0; i < 5; i++ { // 5 consumers
wg.Add(1)
go consume(i, jobs)
}
// Start producing
go produce(jobs)
wg.Wait() // Wait all consumers to finish processing jobs
}
输出是 "like" 通道 results
的输出(当然 execution/completion 顺序是随机的)。
在 Go Playground 上试试这个变体。