如何捕获 CompletableFuture 的 whenCompleteAsync 调用中抛出的 RejectedExecutionException?
How can I capture the RejectedExecutionException thrown in a CompletableFuture's whenCompleteAsync invocation?
下面的示例代码我正在注入一个 biconsumer
休眠 100 毫秒作为一组可完成未来的完成动作。我通过提供一个单独的 executorService
来使用 whenCompleteAsync
方法。 executorService
是一个 ThreadPoolExecutor
,核心池大小为 5,最大大小为 5,队列长度为 1。
public class CompleteTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) {
list.get(i).complete(i + "");
}
}
}
当我 运行 代码时,即使我完成了 100 个期货,也只打印了 6 个输出。那是 5 个核心线程和 1 个排队线程。剩下的怎么办?如果其他runnable因为队列已满而无法提交给executor服务,应该不会有异常吧?
输出
Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5
抛出异常,并且 CompletableFuture
异常完成,只是不是您正在跟踪的任何一个。
您正在使用构造函数实例化和初始化 ThreadPoolExecutor
,该构造函数使用默认的 RejectedExecutionHandler
,只会引发异常。我们知道,如果 ExecutorService
无法接受任务,则会抛出 RejectedExecutionException
。那么任务添加到哪里,异常又抛到哪里呢?
就目前而言,所有链接都发生在 whenCompleteAsync
内。当你调用它时,你向接收者添加了一个依赖项 CompletableFuture
,stringCompletableFuture
。当 stringCompletableFuture
完成时(在本例中成功),它将创建一个新的 CompletableFuture
(它是 returns)并尝试在给定的 BiConsumer
上安排给定的 BiConsumer
=16=]。
由于 ExecutorService
的队列没有 space,它会调用 RejectedExecutionHandler
,这将抛出 RejectedExecutionException
。该异常在那时被捕获并用于 completeExceptionally
将返回的 CompletableFuture
。
换句话说,在你的 for
循环中,捕获 whenCompleteAsync
返回的 CompletableFuture
,存储它,并打印出它的状态。
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
dependents.add(thisWillHaveException);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) {
list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
cf.whenComplete((r, e) -> {
if (e != null)
System.out.println(cf + " " + e.getMessage());
});
});
您会注意到它们(除了之前成功打印的 6 个)都异常完成,并且 RejectedExecutionException
。
...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
下面的示例代码我正在注入一个 biconsumer
休眠 100 毫秒作为一组可完成未来的完成动作。我通过提供一个单独的 executorService
来使用 whenCompleteAsync
方法。 executorService
是一个 ThreadPoolExecutor
,核心池大小为 5,最大大小为 5,队列长度为 1。
public class CompleteTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) {
list.get(i).complete(i + "");
}
}
}
当我 运行 代码时,即使我完成了 100 个期货,也只打印了 6 个输出。那是 5 个核心线程和 1 个排队线程。剩下的怎么办?如果其他runnable因为队列已满而无法提交给executor服务,应该不会有异常吧?
输出
Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5
抛出异常,并且 CompletableFuture
异常完成,只是不是您正在跟踪的任何一个。
您正在使用构造函数实例化和初始化 ThreadPoolExecutor
,该构造函数使用默认的 RejectedExecutionHandler
,只会引发异常。我们知道,如果 ExecutorService
无法接受任务,则会抛出 RejectedExecutionException
。那么任务添加到哪里,异常又抛到哪里呢?
就目前而言,所有链接都发生在 whenCompleteAsync
内。当你调用它时,你向接收者添加了一个依赖项 CompletableFuture
,stringCompletableFuture
。当 stringCompletableFuture
完成时(在本例中成功),它将创建一个新的 CompletableFuture
(它是 returns)并尝试在给定的 BiConsumer
上安排给定的 BiConsumer
=16=]。
由于 ExecutorService
的队列没有 space,它会调用 RejectedExecutionHandler
,这将抛出 RejectedExecutionException
。该异常在那时被捕获并用于 completeExceptionally
将返回的 CompletableFuture
。
换句话说,在你的 for
循环中,捕获 whenCompleteAsync
返回的 CompletableFuture
,存储它,并打印出它的状态。
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
dependents.add(thisWillHaveException);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) {
list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
cf.whenComplete((r, e) -> {
if (e != null)
System.out.println(cf + " " + e.getMessage());
});
});
您会注意到它们(除了之前成功打印的 6 个)都异常完成,并且 RejectedExecutionException
。
...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]