Java 动态缩放线程数的ExecutorService

Java ExecutorService that dynamically scale the number of threads

我有一个工作单元列表,我想并行处理它们。每个单元工作 8-15 秒,完全计算时间,没有 I/O 阻塞。我想要实现的是 ExecutorService 是:

类似于:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
    service.submit(() -> {
        .. do some work ..
        queue.offer(result);
    );
}
while(queue.peek() != null) {
    ... process results while they arrive ...
}

我试过没有成功的是:

我确定有官方实现来处理这个非常基本的并发用例。 有人可以建议吗?

使用 LinkedBlockingQueue20 作为 corePoolSize 创建 ThreadPoolExecutor 构造函数中的第一个参数 ):

new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());


如果您使用LinkedBlockingQueue 没有预定义的容量Pool:

  • Won't 曾经检查过 maxPoolSize.
  • 不会创建比corePoolSize的指定数量更多的线程。

在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将它设置为 0,如果 corePoolSize 设置为 0 将不会创建任何内容 (他们怎么敢?).

即使 corePoolSize0,更多版本确实会创建一个新线程,这似乎是... 一个修复那是……一个错误……改变了……一个合乎逻辑的行为?.

Thread Pool Executor

Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)


关于缩减

为了在无事可做时删除所有线程,您必须特别关闭 coreThreads默认情况下它们不会终止) .为此,在启动 Pool.

之前设置 allowCoreThreadTimeOut(true)

注意设置正确的 keep-alive 超时:例如,如果平均每 6 秒收到一个新任务,将保持活动时间设置为 5 秒 可以 导致不必要的 erase+create 操作(哦亲爱的线程,你只需要等待一秒钟!)。根据任务接收收入速度设置这个超时时间。

allowCoreThreadTimeOut

Sets the policy governing whether core threads may time out and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive. When false, core threads are never terminated due to lack of incoming tasks. When true, the same keep-alive policy applying to non-core threads applies also to core threads. To avoid continual thread replacement, the keep-alive time must be greater than zero when setting true. This method should in general be called before the pool is actively used.


TL/DR

  • 无界LinkedBloquingQueue作为任务队列。
  • corePoolSize 替换 maxPoolSize 意思 .
  • allowCoreThreadTimeOut(true) 为了让 Pool 缩小 使用基于超时的机制也会影响 coreThreads.
  • keep-alive 值设置为 一些基于任务接收延迟的逻辑值

这种新鲜组合将导致 ExecutorService 99,99999% 的时间 不会阻止提交者 (要做到这一点,排队的任务数应该是2.147.483.647,并且有效地 缩放工作负载基础中的线程数量 ,在 [=38 之间波动 (双向) =]个并发线程。

作为一个建议,应该监控队列的大小,因为非阻塞行为是有代价的:如果它在不受控制的情况下持续增长,直到 INTEGER.MAX_VALUE 时,出现 OOM 异常的概率是遇到(f.e:如果线程死锁一整天而提交者不断插入任务)即使任务在内存中的大小可能很小,2.147.483.647 个对象及其相应的 link 包装器等...也是很多额外负载。

最简单的方法是使用方法

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

的class Executors。这为您提供了一个简单的开箱即用解决方案。您获得的池将根据需要扩大和缩小。您可以使用处理核心线程超时等的方法进一步配置它。
ScheduledExecutorService 是 ExecutorService class 的扩展,并且是唯一开箱即用的可以动态扩展和收缩的。