执行器框架——生产者消费者模式
Executor framework - Producer Consumer pattern
在5.3.1节Java_author中提到,
... many producer-consumer designs can be expressed using the Executor
task execution framework, which itself uses the producer-consumer pattern.
... The producer-consumer pattern offers a thread-friendly means of decomposing the problem into simpler components(if possible).
Executor 框架实现在内部是否遵循生产者-消费者模式?
如果是,生产者-消费者模式的思想如何帮助实施Executor框架?
Executor framework
使用 producer-consumer
模式。
来自维基百科,
In computing, the producer–consumer problem (also known as the
bounded-buffer problem) is a classic example of a multi-process
synchronization problem. The problem describes two processes, the
producer and the consumer, who share a common, fixed-size buffer used
as a queue. The producer's job is to generate data, put it into the
buffer, and start again. At the same time, the consumer is consuming
the data (i.e., removing it from the buffer), one piece at a time. The
problem is to make sure that the producer won't try to add data into
the buffer if it's full and that the consumer won't try to remove data
from an empty buffer.
如果我们查看不同的 ExecutorService framework
实现,更具体地说 ThreadPoolExecutor
class,它基本上具有以下内容:
- 一个队列,作业在其中提交和保存
- 消耗提交到队列的任务的线程数。
根据执行器服务的类型,这些参数会发生变化
例如,
- 固定线程池使用
LinkedBlockingQueue
和用户配置的线程数
- 缓存线程池使用
SynchronousQueue
和 0
到 Integer.MAX_VALUE
之间的线程数,具体取决于提交的任务数
的实现
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
现在检查
private boolean addWorker(Runnable firstTask, boolean core) {
// After some checks, it creates Worker and start the thread
Worker w = new Worker(firstTask);
Thread t = w.thread;
// After some checks, thread has been started
t.start();
}
实施Worker
:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use ReentrantLock
* because we do not want worker tasks to be able to reacquire the
* lock when they invoke pool control methods like setCorePoolSize.
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
执行哪个 Runnable
取决于以下逻辑。
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// After some checks, below code returns Runnable
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
总结:
生产者 在 execute
API 和 workQueue.offer(command)
中添加 Runnable
或 Callable
execute()
方法根据需要创建 Worker
线程
这个Worker
线程在无限循环中运行。它从 getTask()
获取任务(例如 Runnable
)
getTask()
在 BlockingQueue<Runnable> workQueue)
上汇集并取得 Runnable
。它是BlockingQueue
的消费者。
Does Executor framework implementation internally follow producer-consumer pattern?
是的,如上所述。
If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?
BlockingQueue
实现如 ArrayBlockingQueue
和 ExecutorService
实现 ThreadPoolExecutor
是线程安全的。程序员显式实现同步、等待和通知调用以实现相同的开销已减少。
在5.3.1节Java_author中提到,
... many producer-consumer designs can be expressed using the
Executor
task execution framework, which itself uses the producer-consumer pattern.... The producer-consumer pattern offers a thread-friendly means of decomposing the problem into simpler components(if possible).
Executor 框架实现在内部是否遵循生产者-消费者模式?
如果是,生产者-消费者模式的思想如何帮助实施Executor框架?
Executor framework
使用 producer-consumer
模式。
来自维基百科,
In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.
如果我们查看不同的 ExecutorService framework
实现,更具体地说 ThreadPoolExecutor
class,它基本上具有以下内容:
- 一个队列,作业在其中提交和保存
- 消耗提交到队列的任务的线程数。
根据执行器服务的类型,这些参数会发生变化
例如,
- 固定线程池使用
LinkedBlockingQueue
和用户配置的线程数 - 缓存线程池使用
SynchronousQueue
和0
到Integer.MAX_VALUE
之间的线程数,具体取决于提交的任务数
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
现在检查
private boolean addWorker(Runnable firstTask, boolean core) {
// After some checks, it creates Worker and start the thread
Worker w = new Worker(firstTask);
Thread t = w.thread;
// After some checks, thread has been started
t.start();
}
实施Worker
:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use ReentrantLock
* because we do not want worker tasks to be able to reacquire the
* lock when they invoke pool control methods like setCorePoolSize.
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
执行哪个 Runnable
取决于以下逻辑。
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// After some checks, below code returns Runnable
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
总结:
生产者 在
execute
API 和workQueue.offer(command)
中添加Runnable
或Callable
execute()
方法根据需要创建Worker
线程这个
Worker
线程在无限循环中运行。它从getTask()
获取任务(例如 getTask()
在BlockingQueue<Runnable> workQueue)
上汇集并取得Runnable
。它是BlockingQueue
的消费者。
Runnable
)
Does Executor framework implementation internally follow producer-consumer pattern?
是的,如上所述。
If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?
BlockingQueue
实现如 ArrayBlockingQueue
和 ExecutorService
实现 ThreadPoolExecutor
是线程安全的。程序员显式实现同步、等待和通知调用以实现相同的开销已减少。