Java 动态缩放线程数的ExecutorService
Java ExecutorService that dynamically scale the number of threads
我有一个工作单元列表,我想并行处理它们。每个单元工作 8-15 秒,完全计算时间,没有 I/O 阻塞。我想要实现的是 ExecutorService
是:
- 没有工作可做时实例化了零个线程
- 如果需要可以动态扩展到 20 个线程
- 允许我一次添加所有工作单元(不阻止提交)
类似于:
Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}
我试过没有成功的是:
- 使用
newCachedThreadPool()
创建过多线程
- 然后我使用了它的内部调用
new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>())
,但后来我注意到 submit() 由于同步队列而阻塞
- 所以我使用了
new LinkedBlockingQueue()
,只是为了发现 ThreadPoolExecutor 只生成一个线程
我确定有官方实现来处理这个非常基本的并发用例。
有人可以建议吗?
使用 LinkedBlockingQueue
和 20
作为 corePoolSize
创建 ThreadPoolExecutor
( 构造函数中的第一个参数 ):
new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());
如果您使用LinkedBlockingQueue
没有预定义的容量,Pool
:
- Won't 曾经检查过
maxPoolSize
.
- 不会创建比
corePoolSize
的指定数量更多的线程。
在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将它设置为 0
,如果 corePoolSize
设置为 0
, 将不会创建任何内容 (他们怎么敢?).
即使 corePoolSize
是 0
,更多版本确实会创建一个新线程,这似乎是... 一个修复那是……一个错误……改变了……一个合乎逻辑的行为?.
Using an unbounded queue (for example a LinkedBlockingQueue
without a
predefined capacity) will cause new tasks to wait in the queue when
all corePoolSize threads are busy. Thus, no more than corePoolSize
threads will ever be created. (And the value of the maximumPoolSize
therefore doesn't have any effect.)
关于缩减
为了在无事可做时删除所有线程,您必须特别关闭 coreThreads
(默认情况下它们不会终止) .为此,在启动 Pool
.
之前设置 allowCoreThreadTimeOut(true)
注意设置正确的 keep-alive
超时:例如,如果平均每 6 秒收到一个新任务,将保持活动时间设置为 5 秒 可以 导致不必要的 erase+create 操作(哦亲爱的线程,你只需要等待一秒钟!)。根据任务接收收入速度设置这个超时时间。
Sets the policy governing whether core threads may time out and
terminate if no tasks arrive within the keep-alive time, being
replaced if needed when new tasks arrive. When false, core threads are
never terminated due to lack of incoming tasks. When true, the same
keep-alive policy applying to non-core threads applies also to core
threads. To avoid continual thread replacement, the keep-alive time
must be greater than zero when setting true. This method should in
general be called before the pool is actively used.
TL/DR
- 无界
LinkedBloquingQueue
作为任务队列。
corePoolSize
替换 maxPoolSize
的 意思 .
allowCoreThreadTimeOut(true)
为了让 Pool
缩小 使用基于超时的机制也会影响 coreThreads
.
keep-alive
值设置为 一些基于任务接收延迟的逻辑值 。
这种新鲜组合将导致 ExecutorService
99,99999% 的时间 不会阻止提交者 (要做到这一点,排队的任务数应该是2.147.483.647),并且有效地 缩放工作负载基础中的线程数量 ,在 [=38 之间波动 (双向) =]个并发线程。
作为一个建议,应该监控队列的大小,因为非阻塞行为是有代价的:如果它在不受控制的情况下持续增长,直到 INTEGER.MAX_VALUE
时,出现 OOM
异常的概率是遇到(f.e:如果线程死锁一整天而提交者不断插入任务)。 即使任务在内存中的大小可能很小,2.147.483.647 个对象及其相应的 link 包装器等...也是很多额外负载。
最简单的方法是使用方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
的class Executors。这为您提供了一个简单的开箱即用解决方案。您获得的池将根据需要扩大和缩小。您可以使用处理核心线程超时等的方法进一步配置它。
ScheduledExecutorService 是 ExecutorService class 的扩展,并且是唯一开箱即用的可以动态扩展和收缩的。
我有一个工作单元列表,我想并行处理它们。每个单元工作 8-15 秒,完全计算时间,没有 I/O 阻塞。我想要实现的是 ExecutorService
是:
- 没有工作可做时实例化了零个线程
- 如果需要可以动态扩展到 20 个线程
- 允许我一次添加所有工作单元(不阻止提交)
类似于:
Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}
我试过没有成功的是:
- 使用
newCachedThreadPool()
创建过多线程 - 然后我使用了它的内部调用
new ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>())
,但后来我注意到 submit() 由于同步队列而阻塞 - 所以我使用了
new LinkedBlockingQueue()
,只是为了发现 ThreadPoolExecutor 只生成一个线程
我确定有官方实现来处理这个非常基本的并发用例。 有人可以建议吗?
使用 LinkedBlockingQueue
和 20
作为 corePoolSize
创建 ThreadPoolExecutor
( 构造函数中的第一个参数 ):
new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());
如果您使用LinkedBlockingQueue
没有预定义的容量,Pool
:
- Won't 曾经检查过
maxPoolSize
. - 不会创建比
corePoolSize
的指定数量更多的线程。
在您的情况下,只会执行一个线程。你很幸运能得到一个,因为你将它设置为 0
,如果 corePoolSize
设置为 0
,
即使 corePoolSize
是 0
,更多版本确实会创建一个新线程,这似乎是... 一个修复那是……一个错误……改变了……一个合乎逻辑的行为?.
Using an unbounded queue (for example a
LinkedBlockingQueue
without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)
关于缩减
为了在无事可做时删除所有线程,您必须特别关闭 coreThreads
(默认情况下它们不会终止) .为此,在启动 Pool
.
allowCoreThreadTimeOut(true)
注意设置正确的 keep-alive
超时:例如,如果平均每 6 秒收到一个新任务,将保持活动时间设置为 5 秒 可以 导致不必要的 erase+create 操作(哦亲爱的线程,你只需要等待一秒钟!)。根据任务接收收入速度设置这个超时时间。
Sets the policy governing whether core threads may time out and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive. When false, core threads are never terminated due to lack of incoming tasks. When true, the same keep-alive policy applying to non-core threads applies also to core threads. To avoid continual thread replacement, the keep-alive time must be greater than zero when setting true. This method should in general be called before the pool is actively used.
TL/DR
- 无界
LinkedBloquingQueue
作为任务队列。 corePoolSize
替换maxPoolSize
的 意思 .allowCoreThreadTimeOut(true)
为了让Pool
缩小 使用基于超时的机制也会影响coreThreads
.keep-alive
值设置为 一些基于任务接收延迟的逻辑值 。
这种新鲜组合将导致 ExecutorService
99,99999% 的时间 不会阻止提交者 (要做到这一点,排队的任务数应该是2.147.483.647),并且有效地 缩放工作负载基础中的线程数量 ,在 [=38 之间波动 (双向) =]个并发线程。
作为一个建议,应该监控队列的大小,因为非阻塞行为是有代价的:如果它在不受控制的情况下持续增长,直到 INTEGER.MAX_VALUE
时,出现 OOM
异常的概率是遇到(f.e:如果线程死锁一整天而提交者不断插入任务)。 即使任务在内存中的大小可能很小,2.147.483.647 个对象及其相应的 link 包装器等...也是很多额外负载。
最简单的方法是使用方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
的class Executors。这为您提供了一个简单的开箱即用解决方案。您获得的池将根据需要扩大和缩小。您可以使用处理核心线程超时等的方法进一步配置它。
ScheduledExecutorService 是 ExecutorService class 的扩展,并且是唯一开箱即用的可以动态扩展和收缩的。