如何获取 CompletableFuture 处理程序之外的异常?

How to Obtain the Exception Outside of CompletableFuture Handler?

我遇到以下情况,想看看是否有解决方案:

快乐之路应该是直截了当的,但是,当涉及到服务发出的错误时,应遵守以下规则:

我的问题是,由于这些服务 return 一些对象的列表,即使我使用 CompletableFuture.handle() 并检查是否存在异常,我也不能 return异常本身是为了捕获并让 Spring 建议 class 处理它(链接到 return 列表)。

我想到的一件事是使用 AtomicReference 来捕获异常并将它们设置在 handle() 中并在期货为 done/complete 时使用它们,例如

AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();

.handle((result, exception) -> {
    if (exception != null) {
        ce1.set(exception);
        return null; // This signals that there was a failure
    } else {
        return result;
    }
});

List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();

/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
    /** Signal that the API needs to fail as a whole */
    throw new CustomException(/** do logic to capture ce1.get().getMessage() + ce2.get().getMessage() */);
}

首先,这听起来像是多线程异步调用中的可行解决方案吗?

其次,这看起来很乱,所以我想知道是否有更优雅的方法在 Spring 异步池之外捕获这些异常,并在主线程中处理它,例如结合异常信息,抛给Spring Advice exception handler.

CompletableFutures 处理起来相当麻烦,但在我看来,这是一种更实用、反应更灵敏的方法。

我们需要 :

中的 sequence 方法
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

然后,我使用 Optional 来表示操作的状态,但是 更适合(所以如果您的代码库中有这样的实用程序,请使用一个 - Java 还没有自带):

CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
    // log e
    return Optional.empty();
});

现在等待两个期货并在结果可用时处理:

CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
    Optional<List<Object>> result1 = results.get(0);
    Optional<List<Object>> result2 = results.get(1);
    if (result1.isEmpty() && result2.isEmpty()) {
        throw new CustomException(...);
    }
    // 
    return Stream.of(
            result1.map(Stream::of).orElseGet(Stream::empty),
            result2.map(Stream::of).orElseGet(Stream::empty)
    ).collect(Collectors.toList());
});

那你最好直接returnmergedResults让框架给你处理,这样你就不会阻塞任何线程,或者你可以.get()就可以了(这将阻塞线程),如果您的 CustomException(或任何其他异常)被抛出(可在 e.getCause() 中访问),它将抛出 ExecutionException


如果您已经在使用 Project Reactor(或同等产品),这看起来会更简单,但想法大致相同。

假设有两个期货

CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …

一种straight-forward结合两种期货的方法是

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

但是如果任何一个 future 失败,这个 future 都会失败。

要仅在两个 futures 都失败时才失败并从两个 throwable 构造一个新的异常,我们可以定义两个实用方法:

private static Throwable getThrowable(CompletableFuture<?> f) {
    return f.<Throwable>thenApply(value -> null)
            .exceptionally(throwable -> throwable).join();
}

private static <T> T throwCustom(Throwable t1, Throwable t2) {
    throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}

方法getThrowable旨在用于已知异常完成的未来。我们可以调用 join 并捕获异常,但如上所示,我们还可以将 future 转换为包含 throwable 作为其值的 non-exceptional future。

然后,我们可以将以上所有内容结合起来

CompletableFuture<List<String>> failOnlyWhenBothFailed = both
    .thenApply(list -> both)
    .exceptionally(t ->
        !service1Result.isCompletedExceptionally()? service1Result:
        !service2Result.isCompletedExceptionally()? service2Result:
        throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
    .thenCompose(Function.identity());

在传递给 exceptionally 的函数中,传入的 futures 已知已完成,因此我们可以使用实用程序方法提取可抛出的对象并抛出新的异常。

这样做的好处是得到的构造是non-blocking.

但在你的情况下,你想等待完成而不是返回一个未来,所以我们可以简化操作:

CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
    (list1, list2) -> Stream.concat(list1.stream(), list2.stream())
                            .collect(Collectors.toList()));

both.exceptionally(t -> null).join();

if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
  Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
  throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}

List<String> result = (
    service1Result.isCompletedExceptionally()? service2Result:
    service2Result.isCompletedExceptionally()? service1Result: both
).join();

通过使用 both.exceptionally(t -> null).join();,我们等待两个作业的完成,而不会在失败时抛出异常。在这条语句之后,我们可以安全地使用 isCompletedExceptionally() 来检查我们知道要完成的期货。

因此,如果两者都失败了,我们提取可抛出的对象并抛出我们的自定义异常,否则,我们检查哪些任务成功并提取其中一个或两个的结果。

仅仅因为我提出它是一种可能性,我想像这样的东西应该在使用 Project Reactor 的世界中起作用:

首先我们将服务修改为 return Monos,这很容易使用 Mono.fromFuture(或者你可以将一个服务变成 Reactor 风格,如果它准备好了) :

@Service
public class Serv1 implements ServInf {
    public Mono<List<Obj>> getSomething(int id) {
        // The service ensures that the list is never null, but it can be empty
        return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
        //This Mono will either emit the result or complete with an error in case of Exception
    }
}

//similar for Serv2

(反应性)端点可能如下所示(请参阅下面的编号评论):

public Mono<WrapperObj> getById(String id) {
        WrapperObj wrapper = new WrapperObj(); //1
        Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
            .subscribeOn(Schedulers.boundedElastic()) //2
            .map(Optional::ofNullable) //3
            .doOnError(wrapper::setS1ErrorResult) //4
            .onErrorResume(t -> Mono.just(Optional.empty())); //5

        Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
            .subscribeOn(Schedulers.boundedElastic()) //2
            .map(Optional::ofNullable) //3
            .doOnError(wrapper::setS2ErrorResult) //4
            .onErrorResume(t -> Mono.just(Optional.empty())); //5

        return s1Mono
            .zipWith(s2Mono) //6
            .map(result ->
                //transforms non-error results and merges them into the wrapper object
                transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
            )
            .switchIfEmpty(Mono.just(wrapper)) //8

        ;
    }

评论:

  1. 结果用于'accumulate'结果和Exceptions

  2. boundedElastic线程池上调用服务,建议用于较长的IO任务。

  3. 将结果包装在Optional中。我使用一个空的 Optional 作为错误完成的便利结果,因为 nulls 不能很好地通过 Reactor 传播。

  4. 如果服务调用抛出异常,我们可以在WrapperObj上设置相应的错误结果。这类似于您对 AtomicReference 的使用,但没有创建额外的对象。

  5. 但是,这样的异常会导致 zipWith (6) 失败,因此如果发生这种情况,我们将替换为 Optional.empty() 结果。

  6. zipWith 创建两个结果的元组

  7. 我们处理这些结果,替换

  8. 剩下的就是对两个(non-exceptional)结果进行变换:

    private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) {
        //perform your result transformation and
        //flesh out 'wrapper' with the results
        //if there was an exception, the 'wrapper' contains the corresponding exception values
        return wrapper;
    }