如何捕获 CompletableFuture 的 whenCompleteAsync 调用中抛出的 RejectedExecutionException?

How can I capture the RejectedExecutionException thrown in a CompletableFuture's whenCompleteAsync invocation?

下面的示例代码我正在注入一个 biconsumer 休眠 100 毫秒作为一组可完成未来的完成动作。我通过提供一个单独的 executorService 来使用 whenCompleteAsync 方法。 executorService 是一个 ThreadPoolExecutor,核心池大小为 5,最大大小为 5,队列长度为 1。

public class CompleteTest {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        ArrayList<CompletableFuture<String>> list = new ArrayList<>();

        for (int i = 0; i <100; i++) {
            CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
            stringCompletableFuture.whenCompleteAsync((e, a) -> {
                System.out.println("Complete " + e);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {e1.printStackTrace();}
            }, executorService);

            list.add(stringCompletableFuture);
        }

        for (int i = 0; i < list.size(); i++) {
            list.get(i).complete(i + "");
        }
    }
}

当我 运行 代码时,即使我完成了 100 个期货,也只打印了 6 个输出。那是 5 个核心线程和 1 个排队线程。剩下的怎么办?如果其他runnable因为队列已满而无法提交给executor服务,应该不会有异常吧?

输出

Complete 0
Complete 1
Complete 2
Complete 3
Complete 4
Complete 5

抛出异常,并且 CompletableFuture 异常完成,只是不是您正在跟踪的任何一个。

您正在使用构造函数实例化和初始化 ThreadPoolExecutor,该构造函数使用默认的 RejectedExecutionHandler,只会引发异常。我们知道,如果 ExecutorService 无法接受任务,则会抛出 RejectedExecutionException。那么任务添加到哪里,异常又抛到哪里呢?

就目前而言,所有链接都发生在 whenCompleteAsync 内。当你调用它时,你向接收者添加了一个依赖项 CompletableFuturestringCompletableFuture。当 stringCompletableFuture 完成时(在本例中成功),它将创建一个新的 CompletableFuture(它是 returns)并尝试在给定的 BiConsumer 上安排给定的 BiConsumer =16=]。

由于 ExecutorService 的队列没有 space,它会调用 RejectedExecutionHandler,这将抛出 RejectedExecutionException。该异常在那时被捕获并用于 completeExceptionally 将返回的 CompletableFuture

换句话说,在你的 for 循环中,捕获 whenCompleteAsync 返回的 CompletableFuture,存储它,并打印出它的状态。

ArrayList<CompletableFuture<String>> list = new ArrayList<>();
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>();
for (int i = 0; i <100; i++) {
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> {
        System.out.println("Complete " + e);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e1) {e1.printStackTrace();}
    }, executorService);
    dependents.add(thisWillHaveException);
    list.add(stringCompletableFuture);
}

for (int i = 0; i < list.size(); i++) {
    list.get(i).complete(i + "");
}
Thread.sleep(2000);
dependents.forEach(cf -> {
    cf.whenComplete((r, e) -> {
        if (e != null)
            System.out.println(cf + " " + e.getMessage());
    });
});

您会注意到它们(除了之前成功打印的 6 个)都异常完成,并且 RejectedExecutionException

...
java.util.concurrent.CompletableFuture@2d8e6db6[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@3f91beef rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@23ab930d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@1a6c5a9e rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
java.util.concurrent.CompletableFuture@4534b60d[Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniWhenComplete@37bba400 rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]