如何在 ForkJoinPool 中阻塞队列?

How to Block a Queue in ForkJoinPool?

当 ForkJoinPool 的队列已满时,我需要阻塞线程。 这可以在标准的 ThreadPoolExecutor 中完成,例如:

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            5000L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}

我知道,ForkJoinPool 中有一些 Dequeue,但我无法通过其 API 访问它。

更新:请看下面的答案。

经过一些研究,我很乐意回答这个问题:

原因: 由于以下原因,ForkJoinPool 的实现中没有这样的选项。 大多数j.u.c。执行者假定单个并发队列和许多线程。当多个线程 reading/writing 进入队列时,这会导致队列争用并降低性能。因此,这种方法不是很可扩展 --> 队列上的高争用会产生大量的上下文切换和 CPU-business.

实施: 在 ForkJoinPool 中,每个线程都有一个单独的双端队列 (Deque),由数组支持。为了最大限度地减少争用,工作窃取 发生在双端队列的尾部,而任务提交发生在当前线程(工作者)的头部。尾部包含最大的工作部分。换句话说,另一个工作线程从尾部窃取最小化与其他工作线程交互的次数 --> 更少的争用,更好的整体性能。

变通思路: 有全局提交队列。来自非 FJ 线程的提交进入提交队列(Workers 承担这些任务)。还有上面提到的Worker-queues。

队列的最大大小受数量限制:

   /**
     * Maximum size for queue arrays. Must be a power of two less
     * than or equal to 1 << (31 - width of array entry) to ensure
     * lack of wraparound of index calculations, but defined to a
     * value a bit less than this to help users trap runaway
     * programs before saturating systems.
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

当队列满时抛出未经检查的异常:

RejectedExecutionException("Queue capacity exceeded")

java文档中对此进行了描述。

(另外,请参阅 UncaughtExceptionHandler 的 ThreadPool 构造函数)

我倾向于声称当前的实现没有这样的机制,这应该由我们在消费 API 中实现。

例如,可以按如下方式完成:

  1. 实施指数退避逻辑,尝试通过增加下一次重试的时间间隔来定期重新提交任务。 或者..
  2. 编写一个定期检查 submissionQueue 大小的节流器(参见 ForkJoinPool.getQueuedSubmissionCount())。

Here 的官方 JSR-166E java ForkJoinPool 代码以获取更多信息。