潜在递归任务的工作池(即,每个作业都可以排队其他作业)

Worker pool for a potentially recursive task (i.e., each job can queue other jobs)

我正在编写一个应用程序,用户可以从多个 "jobs"(实际上是 URL)开始。在开始(主例程)时,我将这些 URLs 添加到队列中,然后启动 x 个处理这些 URLs.

的 goroutines

在特殊情况下,a URL 指向的资源可能包含更多 URL 必须添加到队列中。这 3 名工人正在等待新工作的到来并处理它们。问题是:一旦每个工人都在等待工作(并且 none 正在生产任何工作),工人应该完全停止。所以要么他们都工作,要么没有人工作。

我当前的实现看起来像这样,但我认为它并不优雅。不幸的是,我想不出一个不包含竞争条件的更好方法,而且我不完全确定这个实现是否真的按预期工作:

var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}

func work(working chan int) {
  absent := make(chan struct{}, 1)
  // if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
  // This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
  one := false
  for {
    select {
    case u, ok := <-queue.Pop():
      if !ok {
        close(absent)
        return
      }
      if !one {
        // I have started working (delta + 1)
        working <- 1
        absent <- struct{}{}
        one = true
      }
      // do work with u (which may lead to queue.Push(urls...))
    case <-absent: // no jobs at the moment. consume absent => wait
      one = false
      working <- -1
    }
  }
}

func Start() {
  working := make(chan int)
  for i := 0; i < WORKER_COUNT; i++ {
    go work(working)
  }
  // the amount of actually working workers...
  sum := 0
  for {
    delta := <-working
    sum += delta
    if sum == 0 {
      queue.Close() // close channel -> kill workers.
      done <- struct{}{}
      return
    }
  }
}

有没有更好的方法来解决这个问题?

您可以 use a sync.WaitGroup (see docs) 控制工作人员的生命周期,并使用非阻塞发送,这样工作人员在尝试排队更多作业时就不会死锁:

package main

import "sync"

const workers = 4

type job struct{}

func (j *job) do(enqueue func(job)) {
    // do the job, calling enqueue() for subtasks as needed
}

func main() {
    jobs, wg := make(chan job), new(sync.WaitGroup)
    var enqueue func(job)

    // workers
    for i := 0; i < workers; i++ {
        go func() {
            for j := range jobs {
                j.do(enqueue)
                wg.Done()
            }
        }()
    }

    // how to queue a job
    enqueue = func(j job) {
        wg.Add(1)
        select {
        case jobs <- j: // another worker took it
        default: // no free worker; do the job now
            j.do(enqueue)
            wg.Done()
        }
    }

    todo := make([]job, 1000)
    for _, j := range todo {
        enqueue(j)
    }
    wg.Wait()
    close(jobs)
}

尝试使用缓冲通道避免死锁的困难在于,您必须预先分配足够大的通道以确保在不阻塞的情况下容纳所有待处理的任务。有问题,除非,比方说,您要抓取的 URL 数量很少且已知。

当您回退到在当前线程中执行普通递归时,您没有静态缓冲区大小限制。当然,仍然存在限制:如果有太多工作待处理,您可能 运行 内存不足,理论上您可以通过深度递归耗尽堆栈(但这很难!)。因此,如果您要对整个 Web 进行爬网,则需要以更复杂的方式跟踪未决任务。

最后,作为一个更完整的示例,我对这段代码并不感到非常自豪,但我碰巧写了一个 function to kick off a parallel sort,它的递归方式与您的 URL 获取方式相同。