CompletableFuture.exceptionally 与执行人

CompletableFuture.exceptionally with executor

CompletableFuture.exceptionally() method 采用 lambda,但没有采用自定义 Executor 的方法的风格,甚至没有“...异步”风格。

哪个执行者异常执行 lambda 运行? 运行 抛出异常的原始 CompletableFuture 是同一个执行者吗?或者(如果是这种情况我会感到惊讶)它是 commonPool 吗?

我做了一些实验。看起来异常处理程序的执行程序不是确定性地选择的。

  1. 如果我不放置任何断点,异常处理程序偶尔会在与具有异常处理程序的 CompletableFuture 相同的执行程序中执行。此外,我可以注意到它 运行 在 same ForkJoinTask 中。两三次运行一次,在主线程上执行。
  2. 如果我在附加异常处理程序之前在运行时放置断点并在那里等待片刻,则异常 lambda 始终在主(调用)线程上执行!

这里是实验代码:

    ForkJoinPool ex = new ForkJoinPool(2,
            ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false);

    // AsyncThreadLocal.requestId.set((int)(Math.random()*1000));

    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName());
        throw new RuntimeException();
        // return 3;
        }, ex);

    CompletableFuture<Integer> f8 = f1.exceptionally(t -> {
        System.out.println(Thread.currentThread().getName());
        return 5;
    });

    Thread.sleep(10000);

程序输出:有时是

  ForkJoinPool-1-worker-1
  main

其他时候是

ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1

看起来它运行在与其 CompletionStage

相同的执行器中
 public static void main(String[] args) throws Exception {
    //ExecutorService executorService = Executors.newFixedThreadPool(3);
    ExecutorService executorService = Executors.newWorkStealingPool();

    while (true) {
        int i = Instant.now().getNano();
        CompletableFuture<?> completableFuture = CompletableFuture.supplyAsync(
                ()-> {
                    System.err.printf("async thread at %d -> %s\n", i, Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    throw new RuntimeException();
                }, executorService);

        completableFuture.exceptionally(
                (err)-> {
                    System.err.printf("error thread for %d -> %s\n", i, Thread.currentThread().getName());
                    return null;
                });

        TimeUnit.SECONDS.sleep(5);
    }
}

对于固定尺寸:

async thread at 418000000 -> pool-1-thread-1

error thread for 418000000 -> pool-1-thread-1

async thread at 646000000 -> pool-1-thread-2

error thread for 646000000 -> pool-1-thread-2

async thread at 646000000 -> pool-1-thread-3

error thread for 646000000 -> pool-1-thread-3

async thread at 646000000 -> pool-1-thread-1

error thread for 646000000 -> pool-1-thread-1

async thread at 647000000 -> pool-1-thread-2

error thread for 647000000 -> pool-1-thread-2

对于偷池(4核):

async thread at 96000000 -> ForkJoinPool-1-worker-1

error thread for 96000000 -> ForkJoinPool-1-worker-1

async thread at 196000000 -> ForkJoinPool-1-worker-1

error thread for 196000000 -> ForkJoinPool-1-worker-1

async thread at 197000000 -> ForkJoinPool-1-worker-1

error thread for 197000000 -> ForkJoinPool-1-worker-1

async thread at 197000000 -> ForkJoinPool-1-worker-1

error thread for 197000000 -> ForkJoinPool-1-worker-1

async thread at 197000000 -> ForkJoinPool-1-worker-1

error thread for 197000000 -> ForkJoinPool-1-worker-1

async thread at 197000000 -> ForkJoinPool-1-worker-1

没有执行人:

async thread at 848000000 -> ForkJoinPool.commonPool-worker-1

error thread for 848000000 -> ForkJoinPool.commonPool-worker-1

async thread at 944000000 -> ForkJoinPool.commonPool-worker-1

error thread for 944000000 -> ForkJoinPool.commonPool-worker-1

async thread at 944000000 -> ForkJoinPool.commonPool-worker-1

error thread for 944000000 -> ForkJoinPool.commonPool-worker-1

async thread at 944000000 -> ForkJoinPool.commonPool-worker-1

error thread for 944000000 -> ForkJoinPool.commonPool-worker-1

async thread at 944000000 -> ForkJoinPool.commonPool-worker-1

error thread for 944000000 -> ForkJoinPool.commonPool-worker-1

形成 JDK 错误讨论 CompletableFuture.exceptionally may execute on main thread :

CompletableFuture.exceptionally does not take an Executor argument since it is not designed to execute the exceptionally task asynchronously.

If the dependent task is not yet completed then the exceptionally task will complete on the same thread that dependent tasks completes on.

If the dependent task is completed then the exceptionally task will complete on the thread that executed the call to exceptionally.

This is the same behaviour that will occur for any non-asynchronous execution, such as thenAccept.

To guarantee that the exceptionally task is executed on a thread within the executor thread pool then it is necessary to use whenCompleteAsync or handleAsync and passing in the executor.

请注意,从 JDK 12 开始,CompletionStage.exceptionallyAsync (and exceptionallyAsync 需要一个 Executor)。