单个执行程序服务中的 RejectedExecutionException
RejectedExecutionException inside single executor service
在我们的一项服务中,有人添加了这样(简化)的一段代码:
public class DeleteMe {
public static void main(String[] args) {
DeleteMe d = new DeleteMe();
for (int i = 0; i < 10_000; ++i) {
d.trigger(i);
}
}
private Future<?> trigger(int i) {
ExecutorService es = Executors.newSingleThreadExecutor();
Future<?> f = es.submit(() -> {
try {
// some long running task
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return f;
}
}
这会失败有时:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3148f668 rejected from java.util.concurrent.ThreadPoolExecutor@6e005dc9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
at com.erabii.so.DeleteMe.trigger(DeleteMe.java:29)
at com.erabii.so.DeleteMe.main(DeleteMe.java:22)
大多数时候错误是 OutOfMemoryError
- 我完全理解。编写代码的人从未调用过 ExecutorService::shutDown
,因此它一直保持活动状态。当然,为每个方法调用创建一个单独的执行器服务是不好的,将会被改变;但这正是看到错误的原因。
不明白的地方是为什么会抛出RejectedExecutionException
,具体是抛出here.
代码注释 there 有一定道理:
- If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.
如果确实如此,execute
的文档怎么没有提到这个?
If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
坦率地说,一开始我虽然 ExecutorService
是 GC-ed - 可达性和范围是不同的东西,GC 允许清除任何 not 可达的东西;但是有一个 Future<?>
将保留对该服务的强烈引用,因此我将其排除在外。
你写了
To be frank initially I though that ExecutorService
is GC-ed - reachability and scope are different things and GC is allowed to clear anything which is not reachable; but there is a Future<?>
that will keep a strong reference to that service, so I excluded this.
但这实际上是一个非常合理的场景,JDK-8145304中对此进行了描述。在错误报告的示例中,ExecutorService
未保存在局部变量中,但局部变量本身不会阻止垃圾收集。
注意异常信息
Task java.util.concurrent.FutureTask@3148f668 rejected from
java.util.concurrent.ThreadPoolExecutor@6e005dc9[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
支持这一点,因为 ThreadPoolExecutor@6e005dc9
的状态被指定为 Terminated
。
期货持有对其创造的参考的假设 ExecutorService
是错误的。实际类型取决于服务实现,但对于常见的类型,它将是 FutureTask
的实例,它没有引用 ExecutorService
。在异常消息中也可以看到这适用于您的情况。
即使它有引用,创建者也将是实际的 ThreadPoolExecutor
,但包装 FinalizableDelegatedExecutorService
实例会收集垃圾并在 [= 上调用 shutdown()
20=] 实例(精简包装器通常是优化代码中过早垃圾收集的良好候选者,它只是绕过包装)。
请注意,虽然错误报告仍未解决,但问题实际上已在 JDK 11 中得到解决。FinalizableDelegatedExecutorService
的基础 class、class DelegatedExecutorService
有一个 execute
实现,如下所示:
public void execute(Runnable command) {
try {
e.execute(command);
} finally { reachabilityFence(this); }
}
在我们的一项服务中,有人添加了这样(简化)的一段代码:
public class DeleteMe {
public static void main(String[] args) {
DeleteMe d = new DeleteMe();
for (int i = 0; i < 10_000; ++i) {
d.trigger(i);
}
}
private Future<?> trigger(int i) {
ExecutorService es = Executors.newSingleThreadExecutor();
Future<?> f = es.submit(() -> {
try {
// some long running task
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return f;
}
}
这会失败有时:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3148f668 rejected from java.util.concurrent.ThreadPoolExecutor@6e005dc9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
at com.erabii.so.DeleteMe.trigger(DeleteMe.java:29)
at com.erabii.so.DeleteMe.main(DeleteMe.java:22)
大多数时候错误是 OutOfMemoryError
- 我完全理解。编写代码的人从未调用过 ExecutorService::shutDown
,因此它一直保持活动状态。当然,为每个方法调用创建一个单独的执行器服务是不好的,将会被改变;但这正是看到错误的原因。
不明白的地方是为什么会抛出RejectedExecutionException
,具体是抛出here.
代码注释 there 有一定道理:
- If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.
如果确实如此,execute
的文档怎么没有提到这个?
If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
坦率地说,一开始我虽然 ExecutorService
是 GC-ed - 可达性和范围是不同的东西,GC 允许清除任何 not 可达的东西;但是有一个 Future<?>
将保留对该服务的强烈引用,因此我将其排除在外。
你写了
To be frank initially I though that
ExecutorService
is GC-ed - reachability and scope are different things and GC is allowed to clear anything which is not reachable; but there is aFuture<?>
that will keep a strong reference to that service, so I excluded this.
但这实际上是一个非常合理的场景,JDK-8145304中对此进行了描述。在错误报告的示例中,ExecutorService
未保存在局部变量中,但局部变量本身不会阻止垃圾收集。
注意异常信息
Task java.util.concurrent.FutureTask@3148f668 rejected from
java.util.concurrent.ThreadPoolExecutor@6e005dc9[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
支持这一点,因为 ThreadPoolExecutor@6e005dc9
的状态被指定为 Terminated
。
期货持有对其创造的参考的假设 ExecutorService
是错误的。实际类型取决于服务实现,但对于常见的类型,它将是 FutureTask
的实例,它没有引用 ExecutorService
。在异常消息中也可以看到这适用于您的情况。
即使它有引用,创建者也将是实际的 ThreadPoolExecutor
,但包装 FinalizableDelegatedExecutorService
实例会收集垃圾并在 [= 上调用 shutdown()
20=] 实例(精简包装器通常是优化代码中过早垃圾收集的良好候选者,它只是绕过包装)。
请注意,虽然错误报告仍未解决,但问题实际上已在 JDK 11 中得到解决。FinalizableDelegatedExecutorService
的基础 class、class DelegatedExecutorService
有一个 execute
实现,如下所示:
public void execute(Runnable command) {
try {
e.execute(command);
} finally { reachabilityFence(this); }
}