Golang 工作池 - 从工作中排队新工作
Golang worker pools - Queue new jobs from within jobs
我正在尝试从一组桶中同时构建一棵树,并且鉴于工作模式在 go 中似乎非常流行,我尝试将其应用于我的问题。基本上,我启动给定数量的工人并让他们收听共享的工作频道。然后第一个工作人员接收树根节点作为第一个工作并用相关信息填充它,然后分支并创建另外 2 个工作。然后这些工作应该分配给其他工人,然后递归地产生更多的工作,直到构建整棵树。
我天真的方法的简化表示类似于:
func workers(count int) {
wg := sync.WaitGroup{}
wg.Add(count)
jobs := make(chan job)
for i := 0; i < count; i++ {
go func() {
// worker waits for job and then executes it
for j := range jobs {
processJob(j, jobs)
}
wg.Done()
}()
}
// start with some initial job
jobs <- job{}
wg.Wait()
}
func processJob(j job, jobs chan job) {
// jobs channel is closed when tree is finished
if done {
close(jobs)
}
// Do some more irrelevant stuff
// sometimes 2 new jobs result from this one
jobs <- job{}
jobs <- job{}
// but that doesn't work, if all workers try to send and no one receives
}
问题是,我无法在 1 个工作中添加 2 个新工作,因为在某些时候每个工作人员都会忙于尝试将工作发送到通道,而没有工作人员在接收端。
任何人都可以指出一个优雅的解决方案的方向,或者我对这个问题的整个方法是错误的吗?
如果没有其他工作人员准备好处理作业,则使用当前工作人员:
func doJob(j job, jobs chan job) {
select {
case jobs <- j:
default:
// Send to jobs was not ready, do the job
// in the current worker.
processJob(j, jobs)
}
}
将发送语句 jobs <- job{}
替换为调用 doJob(job{}, jobs)
。
使用缓冲通道让工作人员忙碌:
jobs := make(chan job, N)
调高 N
,直到找到工作人员最忙的值。 N
的一个好的起始值是 count
。不需要此调整来防止死锁。当N为0时程序不会死锁。
我正在尝试从一组桶中同时构建一棵树,并且鉴于工作模式在 go 中似乎非常流行,我尝试将其应用于我的问题。基本上,我启动给定数量的工人并让他们收听共享的工作频道。然后第一个工作人员接收树根节点作为第一个工作并用相关信息填充它,然后分支并创建另外 2 个工作。然后这些工作应该分配给其他工人,然后递归地产生更多的工作,直到构建整棵树。 我天真的方法的简化表示类似于:
func workers(count int) {
wg := sync.WaitGroup{}
wg.Add(count)
jobs := make(chan job)
for i := 0; i < count; i++ {
go func() {
// worker waits for job and then executes it
for j := range jobs {
processJob(j, jobs)
}
wg.Done()
}()
}
// start with some initial job
jobs <- job{}
wg.Wait()
}
func processJob(j job, jobs chan job) {
// jobs channel is closed when tree is finished
if done {
close(jobs)
}
// Do some more irrelevant stuff
// sometimes 2 new jobs result from this one
jobs <- job{}
jobs <- job{}
// but that doesn't work, if all workers try to send and no one receives
}
问题是,我无法在 1 个工作中添加 2 个新工作,因为在某些时候每个工作人员都会忙于尝试将工作发送到通道,而没有工作人员在接收端。
任何人都可以指出一个优雅的解决方案的方向,或者我对这个问题的整个方法是错误的吗?
如果没有其他工作人员准备好处理作业,则使用当前工作人员:
func doJob(j job, jobs chan job) {
select {
case jobs <- j:
default:
// Send to jobs was not ready, do the job
// in the current worker.
processJob(j, jobs)
}
}
将发送语句 jobs <- job{}
替换为调用 doJob(job{}, jobs)
。
使用缓冲通道让工作人员忙碌:
jobs := make(chan job, N)
调高 N
,直到找到工作人员最忙的值。 N
的一个好的起始值是 count
。不需要此调整来防止死锁。当N为0时程序不会死锁。