java.util.concurrent.CompletableFuture 中的异常传播

Exception propagation in java.util.concurrent.CompletableFuture

有两段代码。

在第一个中,我们从总是抛出一些异常的任务创建 CompletableFuture。然后我们对这个未来应用 "exceptionally" 方法,然后是 "theAccept" 方法。我们不会将 Accept 方法返回的新未来分配给任何变量。然后我们在原始未来调用 "join" 。我们看到的是 "exceptionally" 方法和 "thenAccept" 方法都被调用了。我们看到它是因为他们在输出中打印了适当的行。但是异常没有被 "exceptionally" 方法抑制。在这种情况下,抑制异常并为我们提供一些默认值正是我们对 "exceptionally" 的期望。

在第二个片段中,我们做几乎相同的事情,但将新返回的 future 分配给变量并在其上调用 "join"。在这种情况下,正如预期的那样,异常被抑制了。

从我的角度来看,对于第一部分,一致的行为要么不抑制异常并且不调用 "exceptionally" 和 "thenAccept",要么异常调用并抑制异常。

为什么我们要介于两者之间?

第一个片段:

public class TestClass {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(TestClass::doSomethingForInteger);

        future.exceptionally(e -> {
                    System.out.println("Exceptionally");
                    return 42;
                })
                .thenAccept(r -> {
                    System.out.println("Accept");
                });

        future.join();
    }

    private static int doSomethingForInteger() {
        throw new IllegalArgumentException("Error");
    }
}

第二个片段:

public class TestClass {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(TestClass::doSomethingForInteger);

        CompletableFuture<Void> voidCompletableFuture = future.exceptionally(e -> {
            System.out.println("Exceptionally");
            return 42;
        })
                .thenAccept(r -> {
                    System.out.println("Accept");
                });

        voidCompletableFuture.join();
    }

    private static int doSomethingForInteger() {
        throw new IllegalArgumentException("Error");
    }
}

没有“抑制异常”这样的东西。当您调用 exceptionally 时,您正在创建一个新的未来,如果前一阶段异常完成,它将使用前一阶段的结果或评估函数的结果来完成。前一阶段,即您调用 exceptionally 的未来,不受影响。

这适用于所有 链接依赖函数或操作的方法。这些方法中的每一种都创建了一个新的未来,它将按照记录完成。 None 其中会影响您调用该方法的现有未来。

也许,通过以下示例会变得更加清晰:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return "a string";
});

CompletableFuture<Integer> f2 = f1.thenApply(s -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    return s.length();
});

f2.thenAccept(i -> System.out.println("result of f2 = "+i));

String s = f1.join();
System.out.println("result of f1 = "+s);

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

这里需要明确的是,依赖阶段的结果Integer不能取代前提阶段的结果String。这只是两种不同的未来,有着不同的结果。由于在 f1 上调用 join() 查询第一阶段的结果,因此它不依赖于 f2,因此甚至不等待其完成。 (这也是代码等待所有后台结束的原因activity)。

exceptionally的用法没有区别。在非例外情​​况下,下一个阶段具有相同的类型甚至相同的结果可能会令人困惑,但这不会改变存在两个不同阶段的事实。

static void report(String s, CompletableFuture<?> f) {
    f.whenComplete((i,t) -> {
        if(t != null) System.out.println(s+" completed exceptionally with "+t);
        else System.out.println(s+" completed with value "+i);
    });
}
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    throw new IllegalArgumentException("Error for testing");
});
CompletableFuture<Integer> f2 = f1.exceptionally(t -> 42);

report("f1", f1);
report("f2", f2);

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

似乎人们普遍认为 CompletableFuture 链接方法是某种单一未来的建设者,不幸的是,这是误导性的错误。另一个陷阱是以下错误:

CompletableFuture<?> f = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    System.out.println("initial stage");
    return "";
}).thenApply(s -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    System.out.println("second stage");
    return s;
}).thenApply(s -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    System.out.println("third stage");
    return s;
}).thenAccept(s -> {
    System.out.println("last stage");
});

f.cancel(true);
report("f", f);

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

如前所述,每个链接方法都会创建一个新阶段,因此保留对最后一个链接方法返回的阶段(即最后一个阶段)的引用适合获得最终结果。但是取消这个阶段只会取消最后一个阶段和 none 个先决条件阶段。此外,取消后,最后一个阶段不再依赖于其他阶段,因为它已经通过取消完成并且能够报告此异常结果,而其他现在不相关的阶段仍在后台评估。