工作线程池
Worker thread pool
在 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 提供的示例中,该示例已在很多地方被引用。
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
调度服务了 MaxWorker
个作业后,工作人员池(chan chan 作业)是否会耗尽?由于<-d.WorkerPool
是从渠道中拉取的,作业渠道在第一次调用第一种dispatcher.Run()
后没有得到补充?还是我 missing/misreading 什么东西? WorkerPool 如何通过可用的工作渠道得到补充?
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
如果你仔细阅读worker的代码,你会发现
w.WorkerPool <- w.JobChannel
每次循环开始,worker本身的channel被放回
我复制下面的整个函数:
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
在 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 提供的示例中,该示例已在很多地方被引用。
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
调度服务了 MaxWorker
个作业后,工作人员池(chan chan 作业)是否会耗尽?由于<-d.WorkerPool
是从渠道中拉取的,作业渠道在第一次调用第一种dispatcher.Run()
后没有得到补充?还是我 missing/misreading 什么东西? WorkerPool 如何通过可用的工作渠道得到补充?
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
如果你仔细阅读worker的代码,你会发现
w.WorkerPool <- w.JobChannel
每次循环开始,worker本身的channel被放回
我复制下面的整个函数:
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}