扩展线程然后对任务进行排队的 ExecutorService

ExecutorService that scales threads then queues tasks

是否有一个 ExecutorService 实现类似于具有以下特征的线程池?

  1. 始终至少有 X 个活动线程。
  2. 如果提交了一个任务,并且所有活动线程都忙,则启动一个新线程,最多Y个线程。
  3. 如果一个任务被提交并且所有Y线程都忙,则任务被排队。
  4. 如果没有提交新任务,池将缩减为 X 个活动线程。

非常标准的线程池行为。您认为 ThreadPoolExecutor 可以处理这个问题,但是

executorService = new ThreadPoolExecutor(
    2, 10, // min/max threads
    60, TimeUnit.SECONDS, // time of inactivity before scaling back
    new SynchronousQueue<Runnable>()); // task queue

提交超过10个任务会抛出异常。切换到 LinkedBlockingQueue 将永远不会超过两个最小线程,除非你像 new LinkedBlockingQueue<Runnable>(20) 那样限制大小,在这种情况下,将有两个线程处理 1-20 个任务,2-10 个线程处理 21 -30个任务,超过30个任务异常。不漂亮。同时,固定线程池永远不会缩减非活动线程。

那么,为了得到我想要的东西,我可以使用另一种 BlockingQueue 或 fiddle 以及我错过的其他设置吗?是否有另一个更适合(哪个?)的 ExceutorService 实现,或者我最好通过覆盖 ThreadPoolExecutorexecute() 方法来实现自己的实现?

不幸的是,答案是否定的。你可以用 jre 中的内容做的最好的事情是有效地没有线程地板,只有天花板。这可以通过 allowing core threads to timeout.

来完成
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 60, TimeUnit.Seconds, new LinkedBlockingQueue<Runnable>());
tpe.allowCoreThreadTimeOut(true);

因为核心大小是10,所以提交任务时会启动一个新线程,直到有10个线程处于活动状态。之后,任务将在 LinkedBlockingQueue 中排队。如果线程处于非活动状态 60 秒,它将终止。

您想要的行为可以通过编写 class 实现 BlockingQueue 和 RejectedExecutionHandler 来实现,它会在确定任务是应该添加到队列还是应该被拒绝之前检查 ThreadPoolExecutors 的当前状态。

实际上 SynchronousQueue 的大小基本上为零,因此当您提交超过 10 个任务(最大池大小)时,拒绝策略就会启动并且任务会被拒绝。此外,对于每个提交的任务,它都会获得一个线程并使用它。

当您使用大小为 20 的 LinkedBlockingQueue 时,如果已达到核心大小并且没有空闲线程,它将使任务排队。他们将继续排队,除非队列已满,在这种情况下,它将创建一个新线程用于新任务。当池大小达到最大值时,拒绝策略将用于任何新提交。

您希望每当提交任务并且核心(核心大小)中的所有线程都很忙时,您的任务应该获得一个新线程而不是在队列中等待,我认为这是不可能的。

这应该可以解决问题:

    ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, maxPoolSize, 60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
    tpe.setRejectedExecutionHandler((
            Runnable r,
            ThreadPoolExecutor executor) -> {
        // this will block if the queue is full but not until the sun will rise in the west!
            try {
                if(!executor.getQueue().offer(r, 15, TimeUnit.MINUTES)){
                    throw new RejectedExecutionException("Will not wait infinitely for offer runnable to ThreadPoolExecutor");
                }
            } catch (InterruptedException e) {
                throw new RejectedExecutionException("Unable to put runnable to queue", e);
            }
        });