为什么使用的线程数高于要求?
Why are the number of threads used are higher than required?
我有一个 SpringBoot 应用程序,我允许最多 45 个并发请求。
现在,1 个请求 在其旅程中使用 threadPool A
并行调用 16 个外部服务。所以记住平均情况和最坏情况,我一直遵循它的配置:
ThreadPoolTaskExecutor A = new ThreadPoolTaskExecutor();
A.setCorePoolSize(400);
A.setMaxPoolSize(1000);
A.setQueueCapacity(10);
A.setThreadNamePrefix("async-executor");
A.initialize();
我的预期是 最多使用 45*16 = 720 个线程。但是在 运行 负载测试中,我观察到线程不断打开(在线程转储中检查),几分钟后它开始给出 RejectedExecutionException。
RejectedExecutionException
Task ServiceX rejected from org.springframework.scheduling.concurrent.
ThreadPoolTaskExecutor@4221a19e[Running, pool
size = 1000, active threads = 2, queued tasks = 10, completed tasks = 625216]
线程转储中显示的大多数线程
"executor-A-57" #579 prio=5 os_prio=0 tid=0x000000000193f800 nid=0x2e95 waiting on condition [0x00007fa9e820c000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000582dadf90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
我想知道我在这里缺少什么?为什么我会被拒绝?
编辑:
我试图在一小段代码上复制类似的东西,这里是:
MainClass 运行一个长循环。在每个循环中,它调用 service1 3 次。现在我有演示服务,里面只有相同的代码Thread.sleep(100)
。
MainClass.java
package com.flappy.everything.threadpooling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MainClass {
private static ThreadPoolTaskExecutor getExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(4);
threadPoolTaskExecutor.setThreadNamePrefix("async-exec");
threadPoolTaskExecutor.setCorePoolSize(4);
threadPoolTaskExecutor.setQueueCapacity(2);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolTaskExecutor outerExecutor = getExecutor();
List<Service1> services = Arrays.asList(new Service1(), new Service1(), new Service1());
for (int i = 0; i < 1000000; i++) {
List<Future> futures = new ArrayList<>();
for (Service1 service : services) {
futures.add(outerExecutor.submit(() -> {
try {
service.set();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
for (Future future : futures) {
future.get();
}
}
}
}
Service1.java
package com.oyorooms.everything.threadpooling;
import org.springframework.scheduling.annotation.Async;
public class Service1 {
public void set() throws InterruptedException {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName());
}
}
所以理想情况下,应该只为我提供的线程池打开 3 个线程,但我仍然在 运行 代码上收到拒绝。
我建议通过添加 1 个输出任务执行程序的记录器行来对此进行测试,然后 运行 对不同的 16 次调用和 45 次请求进行计数。可能会发生很多事情。
- 也许 ThreadPoolTaskExecutor 不是一个 bean,spring 而是选择另一个在您的应用程序中配置的 bean。
也许应用的其他部分也在使用异步调用
永远循环的代码中可能有一些错误
等...
但是,如果您没有单元测试,一个好的开始是简单地记录正在发生的事情并分析您的日志。
这很有趣。
您列出的代码失败的原因是将元素从工作队列传输到工作线程所花费的时间比主线程将项目放入队列所花费的时间慢。
流程是这样的:
if(there are active threads and is there availability on the queue){
submit to the work queue for the worker threads to pick up // 1
} else {
if(max pool size is not met){
create a new thread with this task being its first task // 2
} else {
reject // 3
}
}
您看到的是代码命中 // 3
。
当您第一次提交任务时,线程数将小于最大池大小。第一轮提交的任务将达到 // 2
.
第一次迭代后,活动线程数将为最大池大小,代码将尝试提交到 // 1
。
假设主线程非常非常快地将 3 个项目放入队列,因此 ThreadPool 中的 4 个线程无法足够快地取出一个。如果发生这种情况,我们将传递第一个 if 语句(因为队列中没有可用性)并转到 else。由于已经达到最大池大小,因此除了 reject
.
别无他法。
这可以通过检查 ThreadPoolExecutor Javadocs 来进一步解释。
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
以后
Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
要解决您的问题,您有两个合理的选择:
使用一个SynchronousQueue。提供给 SynchronousQueue 的线程将无限期地等待,直到另一个线程获取该项目(如果它知道另一个线程正在等待接收它)。如果放置不成功(即,另一个线程不会立即将其关闭),您定义的固定队列大小将导致主线程 return(无阻塞)。要使用 Spring 使用 SynchronousQueue,请将队列容量设置为零。 setQueueCapacity(0)
。
同样来自 Javadocs
A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them.
将队列大小设置为大于或等于您希望提交的并发任务数。一般情况下队列的大小可能不会达到那个大小,但它会在将来保护你。
我有一个 SpringBoot 应用程序,我允许最多 45 个并发请求。
现在,1 个请求 在其旅程中使用 threadPool A
并行调用 16 个外部服务。所以记住平均情况和最坏情况,我一直遵循它的配置:
ThreadPoolTaskExecutor A = new ThreadPoolTaskExecutor();
A.setCorePoolSize(400);
A.setMaxPoolSize(1000);
A.setQueueCapacity(10);
A.setThreadNamePrefix("async-executor");
A.initialize();
我的预期是 最多使用 45*16 = 720 个线程。但是在 运行 负载测试中,我观察到线程不断打开(在线程转储中检查),几分钟后它开始给出 RejectedExecutionException。
RejectedExecutionException
Task ServiceX rejected from org.springframework.scheduling.concurrent.
ThreadPoolTaskExecutor@4221a19e[Running, pool
size = 1000, active threads = 2, queued tasks = 10, completed tasks = 625216]
线程转储中显示的大多数线程
"executor-A-57" #579 prio=5 os_prio=0 tid=0x000000000193f800 nid=0x2e95 waiting on condition [0x00007fa9e820c000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000582dadf90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
我想知道我在这里缺少什么?为什么我会被拒绝?
编辑: 我试图在一小段代码上复制类似的东西,这里是:
MainClass 运行一个长循环。在每个循环中,它调用 service1 3 次。现在我有演示服务,里面只有相同的代码Thread.sleep(100)
。
MainClass.java
package com.flappy.everything.threadpooling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MainClass {
private static ThreadPoolTaskExecutor getExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(4);
threadPoolTaskExecutor.setThreadNamePrefix("async-exec");
threadPoolTaskExecutor.setCorePoolSize(4);
threadPoolTaskExecutor.setQueueCapacity(2);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolTaskExecutor outerExecutor = getExecutor();
List<Service1> services = Arrays.asList(new Service1(), new Service1(), new Service1());
for (int i = 0; i < 1000000; i++) {
List<Future> futures = new ArrayList<>();
for (Service1 service : services) {
futures.add(outerExecutor.submit(() -> {
try {
service.set();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
for (Future future : futures) {
future.get();
}
}
}
}
Service1.java
package com.oyorooms.everything.threadpooling;
import org.springframework.scheduling.annotation.Async;
public class Service1 {
public void set() throws InterruptedException {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName());
}
}
所以理想情况下,应该只为我提供的线程池打开 3 个线程,但我仍然在 运行 代码上收到拒绝。
我建议通过添加 1 个输出任务执行程序的记录器行来对此进行测试,然后 运行 对不同的 16 次调用和 45 次请求进行计数。可能会发生很多事情。
- 也许 ThreadPoolTaskExecutor 不是一个 bean,spring 而是选择另一个在您的应用程序中配置的 bean。
也许应用的其他部分也在使用异步调用
永远循环的代码中可能有一些错误
等...
但是,如果您没有单元测试,一个好的开始是简单地记录正在发生的事情并分析您的日志。
这很有趣。
您列出的代码失败的原因是将元素从工作队列传输到工作线程所花费的时间比主线程将项目放入队列所花费的时间慢。
流程是这样的:
if(there are active threads and is there availability on the queue){
submit to the work queue for the worker threads to pick up // 1
} else {
if(max pool size is not met){
create a new thread with this task being its first task // 2
} else {
reject // 3
}
}
您看到的是代码命中 // 3
。
当您第一次提交任务时,线程数将小于最大池大小。第一轮提交的任务将达到 // 2
.
第一次迭代后,活动线程数将为最大池大小,代码将尝试提交到 // 1
。
假设主线程非常非常快地将 3 个项目放入队列,因此 ThreadPool 中的 4 个线程无法足够快地取出一个。如果发生这种情况,我们将传递第一个 if 语句(因为队列中没有可用性)并转到 else。由于已经达到最大池大小,因此除了 reject
.
这可以通过检查 ThreadPoolExecutor Javadocs 来进一步解释。
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
以后
Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
要解决您的问题,您有两个合理的选择:
使用一个SynchronousQueue。提供给 SynchronousQueue 的线程将无限期地等待,直到另一个线程获取该项目(如果它知道另一个线程正在等待接收它)。如果放置不成功(即,另一个线程不会立即将其关闭),您定义的固定队列大小将导致主线程 return(无阻塞)。要使用 Spring 使用 SynchronousQueue,请将队列容量设置为零。
setQueueCapacity(0)
。 同样来自 JavadocsA good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them.
将队列大小设置为大于或等于您希望提交的并发任务数。一般情况下队列的大小可能不会达到那个大小,但它会在将来保护你。