扩展线程然后对任务进行排队的 ExecutorService
ExecutorService that scales threads then queues tasks
是否有一个 ExecutorService
实现类似于具有以下特征的线程池?
- 始终至少有 X 个活动线程。
- 如果提交了一个任务,并且所有活动线程都忙,则启动一个新线程,最多Y个线程。
- 如果一个任务被提交并且所有Y线程都忙,则任务被排队。
- 如果没有提交新任务,池将缩减为 X 个活动线程。
非常标准的线程池行为。您认为 ThreadPoolExecutor
可以处理这个问题,但是
executorService = new ThreadPoolExecutor(
2, 10, // min/max threads
60, TimeUnit.SECONDS, // time of inactivity before scaling back
new SynchronousQueue<Runnable>()); // task queue
提交超过10个任务会抛出异常。切换到 LinkedBlockingQueue
将永远不会超过两个最小线程,除非你像 new LinkedBlockingQueue<Runnable>(20)
那样限制大小,在这种情况下,将有两个线程处理 1-20 个任务,2-10 个线程处理 21 -30个任务,超过30个任务异常。不漂亮。同时,固定线程池永远不会缩减非活动线程。
那么,为了得到我想要的东西,我可以使用另一种 BlockingQueue
或 fiddle 以及我错过的其他设置吗?是否有另一个更适合(哪个?)的 ExceutorService
实现,或者我最好通过覆盖 ThreadPoolExecutor
的 execute()
方法来实现自己的实现?
不幸的是,答案是否定的。你可以用 jre 中的内容做的最好的事情是有效地没有线程地板,只有天花板。这可以通过 allowing core threads to timeout.
来完成
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 60, TimeUnit.Seconds, new LinkedBlockingQueue<Runnable>());
tpe.allowCoreThreadTimeOut(true);
因为核心大小是10,所以提交任务时会启动一个新线程,直到有10个线程处于活动状态。之后,任务将在 LinkedBlockingQueue 中排队。如果线程处于非活动状态 60 秒,它将终止。
您想要的行为可以通过编写 class 实现 BlockingQueue 和 RejectedExecutionHandler 来实现,它会在确定任务是应该添加到队列还是应该被拒绝之前检查 ThreadPoolExecutors 的当前状态。
实际上 SynchronousQueue
的大小基本上为零,因此当您提交超过 10 个任务(最大池大小)时,拒绝策略就会启动并且任务会被拒绝。此外,对于每个提交的任务,它都会获得一个线程并使用它。
当您使用大小为 20 的 LinkedBlockingQueue
时,如果已达到核心大小并且没有空闲线程,它将使任务排队。他们将继续排队,除非队列已满,在这种情况下,它将创建一个新线程用于新任务。当池大小达到最大值时,拒绝策略将用于任何新提交。
您希望每当提交任务并且核心(核心大小)中的所有线程都很忙时,您的任务应该获得一个新线程而不是在队列中等待,我认为这是不可能的。
这应该可以解决问题:
ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, maxPoolSize, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
tpe.setRejectedExecutionHandler((
Runnable r,
ThreadPoolExecutor executor) -> {
// this will block if the queue is full but not until the sun will rise in the west!
try {
if(!executor.getQueue().offer(r, 15, TimeUnit.MINUTES)){
throw new RejectedExecutionException("Will not wait infinitely for offer runnable to ThreadPoolExecutor");
}
} catch (InterruptedException e) {
throw new RejectedExecutionException("Unable to put runnable to queue", e);
}
});
是否有一个 ExecutorService
实现类似于具有以下特征的线程池?
- 始终至少有 X 个活动线程。
- 如果提交了一个任务,并且所有活动线程都忙,则启动一个新线程,最多Y个线程。
- 如果一个任务被提交并且所有Y线程都忙,则任务被排队。
- 如果没有提交新任务,池将缩减为 X 个活动线程。
非常标准的线程池行为。您认为 ThreadPoolExecutor
可以处理这个问题,但是
executorService = new ThreadPoolExecutor(
2, 10, // min/max threads
60, TimeUnit.SECONDS, // time of inactivity before scaling back
new SynchronousQueue<Runnable>()); // task queue
提交超过10个任务会抛出异常。切换到 LinkedBlockingQueue
将永远不会超过两个最小线程,除非你像 new LinkedBlockingQueue<Runnable>(20)
那样限制大小,在这种情况下,将有两个线程处理 1-20 个任务,2-10 个线程处理 21 -30个任务,超过30个任务异常。不漂亮。同时,固定线程池永远不会缩减非活动线程。
那么,为了得到我想要的东西,我可以使用另一种 BlockingQueue
或 fiddle 以及我错过的其他设置吗?是否有另一个更适合(哪个?)的 ExceutorService
实现,或者我最好通过覆盖 ThreadPoolExecutor
的 execute()
方法来实现自己的实现?
不幸的是,答案是否定的。你可以用 jre 中的内容做的最好的事情是有效地没有线程地板,只有天花板。这可以通过 allowing core threads to timeout.
来完成ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 60, TimeUnit.Seconds, new LinkedBlockingQueue<Runnable>());
tpe.allowCoreThreadTimeOut(true);
因为核心大小是10,所以提交任务时会启动一个新线程,直到有10个线程处于活动状态。之后,任务将在 LinkedBlockingQueue 中排队。如果线程处于非活动状态 60 秒,它将终止。
您想要的行为可以通过编写 class 实现 BlockingQueue 和 RejectedExecutionHandler 来实现,它会在确定任务是应该添加到队列还是应该被拒绝之前检查 ThreadPoolExecutors 的当前状态。
实际上 SynchronousQueue
的大小基本上为零,因此当您提交超过 10 个任务(最大池大小)时,拒绝策略就会启动并且任务会被拒绝。此外,对于每个提交的任务,它都会获得一个线程并使用它。
当您使用大小为 20 的 LinkedBlockingQueue
时,如果已达到核心大小并且没有空闲线程,它将使任务排队。他们将继续排队,除非队列已满,在这种情况下,它将创建一个新线程用于新任务。当池大小达到最大值时,拒绝策略将用于任何新提交。
您希望每当提交任务并且核心(核心大小)中的所有线程都很忙时,您的任务应该获得一个新线程而不是在队列中等待,我认为这是不可能的。
这应该可以解决问题:
ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, maxPoolSize, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
tpe.setRejectedExecutionHandler((
Runnable r,
ThreadPoolExecutor executor) -> {
// this will block if the queue is full but not until the sun will rise in the west!
try {
if(!executor.getQueue().offer(r, 15, TimeUnit.MINUTES)){
throw new RejectedExecutionException("Will not wait infinitely for offer runnable to ThreadPoolExecutor");
}
} catch (InterruptedException e) {
throw new RejectedExecutionException("Unable to put runnable to queue", e);
}
});