执行器框架——生产者消费者模式

Executor framework - Producer Consumer pattern

在5.3.1节Java_author中提到,

... many producer-consumer designs can be expressed using the Executor task execution framework, which itself uses the producer-consumer pattern.

... The producer-consumer pattern offers a thread-friendly means of decomposing the problem into simpler components(if possible).


Executor 框架实现在内部是否遵循生产者-消费者模式?

如果是,生产者-消费者模式的思想如何帮助实施Executor框架?

Executor framework 使用 producer-consumer 模式。

来自维基百科,

In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.

如果我们查看不同的 ExecutorService framework 实现,更具体地说 ThreadPoolExecutor class,它基本上具有以下内容:

  1. 一个队列,作业在其中提交和保存
  2. 消耗提交到队列的任务的线程数。

根据执行器服务的类型,这些参数会发生变化

例如,

  • 固定线程池使用 LinkedBlockingQueue 和用户配置的线程数
  • 缓存线程池使用 SynchronousQueue0Integer.MAX_VALUE 之间的线程数,具体取决于提交的任务数

检查 ThreadPoolExecutor

的实现
public void execute(Runnable command) {
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

现在检查

private boolean addWorker(Runnable firstTask, boolean core) {
     // After some checks, it creates Worker and start the thread
    Worker w = new Worker(firstTask);
    Thread t = w.thread;

   // After some checks, thread has been started
   t.start();
}

实施Worker

  /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use ReentrantLock
     * because we do not want worker tasks to be able to reacquire the
     * lock when they invoke pool control methods like setCorePoolSize.
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

      /** Delegates main run loop to outer runWorker  */
       public void run() {
            runWorker(this);
       }

    final void runWorker(Worker w) {
          Runnable task = w.firstTask;
          w.firstTask = null;
          boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            clearInterruptsForTaskRun();
            try {
                beforeExecute(w.thread, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }

执行哪个 Runnable 取决于以下逻辑。

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
     // After some checks, below code returns Runnable

      try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
}

总结:

  1. 生产者execute API 和 workQueue.offer(command) 中添加 RunnableCallable

  2. execute() 方法根据需要创建 Worker 线程

  3. 这个Worker线程在无限循环中运行。它从 getTask()

  4. 获取任务(例如 Runnable
  5. getTask()BlockingQueue<Runnable> workQueue) 上汇集并取得 Runnable。它是BlockingQueue消费者

Does Executor framework implementation internally follow producer-consumer pattern?

是的,如上所述。

If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?

BlockingQueue 实现如 ArrayBlockingQueueExecutorService 实现 ThreadPoolExecutor 是线程安全的。程序员显式实现同步、等待和通知调用以实现相同的开销已减少。