如何在 ForkJoinPool 中阻塞队列?
How to Block a Queue in ForkJoinPool?
当 ForkJoinPool 的队列已满时,我需要阻塞线程。
这可以在标准的 ThreadPoolExecutor 中完成,例如:
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
我知道,ForkJoinPool 中有一些 Dequeue,但我无法通过其 API 访问它。
更新:请看下面的答案。
经过一些研究,我很乐意回答这个问题:
原因:
由于以下原因,ForkJoinPool 的实现中没有这样的选项。
大多数j.u.c。执行者假定单个并发队列和许多线程。当多个线程 reading/writing 进入队列时,这会导致队列争用并降低性能。因此,这种方法不是很可扩展 --> 队列上的高争用会产生大量的上下文切换和 CPU-business.
实施:
在 ForkJoinPool 中,每个线程都有一个单独的双端队列 (Deque),由数组支持。为了最大限度地减少争用,工作窃取 发生在双端队列的尾部,而任务提交发生在当前线程(工作者)的头部。尾部包含最大的工作部分。换句话说,另一个工作线程从尾部窃取最小化与其他工作线程交互的次数 --> 更少的争用,更好的整体性能。
Doug Lea 在官方白皮书 "Java Fork/Join Framework" 中描述了这个想法。
可扩展性基准显示在“让它崩溃 -
Fork Join Pool的可扩展性
变通思路:
有全局提交队列。来自非 FJ 线程的提交进入提交队列(Workers 承担这些任务)。还有上面提到的Worker-queues。
队列的最大大小受数量限制:
/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
当队列满时抛出未经检查的异常:
RejectedExecutionException("Queue capacity exceeded")
java文档中对此进行了描述。
(另外,请参阅 UncaughtExceptionHandler
的 ThreadPool 构造函数)
我倾向于声称当前的实现没有这样的机制,这应该由我们在消费 API 中实现。
例如,可以按如下方式完成:
- 实施指数退避逻辑,尝试通过增加下一次重试的时间间隔来定期重新提交任务。
或者..
- 编写一个定期检查 submissionQueue 大小的节流器(参见
ForkJoinPool.getQueuedSubmissionCount()
)。
Here 的官方 JSR-166E java ForkJoinPool 代码以获取更多信息。
当 ForkJoinPool 的队列已满时,我需要阻塞线程。 这可以在标准的 ThreadPoolExecutor 中完成,例如:
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
我知道,ForkJoinPool 中有一些 Dequeue,但我无法通过其 API 访问它。
更新:请看下面的答案。
经过一些研究,我很乐意回答这个问题:
原因: 由于以下原因,ForkJoinPool 的实现中没有这样的选项。 大多数j.u.c。执行者假定单个并发队列和许多线程。当多个线程 reading/writing 进入队列时,这会导致队列争用并降低性能。因此,这种方法不是很可扩展 --> 队列上的高争用会产生大量的上下文切换和 CPU-business.
实施: 在 ForkJoinPool 中,每个线程都有一个单独的双端队列 (Deque),由数组支持。为了最大限度地减少争用,工作窃取 发生在双端队列的尾部,而任务提交发生在当前线程(工作者)的头部。尾部包含最大的工作部分。换句话说,另一个工作线程从尾部窃取最小化与其他工作线程交互的次数 --> 更少的争用,更好的整体性能。
Doug Lea 在官方白皮书 "Java Fork/Join Framework" 中描述了这个想法。
可扩展性基准显示在“让它崩溃 - Fork Join Pool的可扩展性
变通思路: 有全局提交队列。来自非 FJ 线程的提交进入提交队列(Workers 承担这些任务)。还有上面提到的Worker-queues。
队列的最大大小受数量限制:
/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
当队列满时抛出未经检查的异常:
RejectedExecutionException("Queue capacity exceeded")
java文档中对此进行了描述。
(另外,请参阅 UncaughtExceptionHandler
的 ThreadPool 构造函数)
我倾向于声称当前的实现没有这样的机制,这应该由我们在消费 API 中实现。
例如,可以按如下方式完成:
- 实施指数退避逻辑,尝试通过增加下一次重试的时间间隔来定期重新提交任务。 或者..
- 编写一个定期检查 submissionQueue 大小的节流器(参见
ForkJoinPool.getQueuedSubmissionCount()
)。
Here 的官方 JSR-166E java ForkJoinPool 代码以获取更多信息。