如何获取 CompletableFuture 处理程序之外的异常?
How to Obtain the Exception Outside of CompletableFuture Handler?
我遇到以下情况,想看看是否有解决方案:
- 必须并行调用两个 Spring 服务(一个是现有服务 call/logic,第二个是新增服务)。
- 然后应合并结果并由 RESTful API return 编辑。
快乐之路应该是直截了当的,但是,当涉及到服务发出的错误时,应遵守以下规则:
API 只有在两个服务调用都失败时才会失败——这应该从主线程而不是 @Async
池中抛出,因为它们是独立的线程并且不会无法访问彼此的异常(至少这是我的推理)。
如果其中只有一个失败,则通过另一个服务(异步)记录错误,并且 API returns 仅来自成功服务的结果 - - 这可以从各自的 @Async
线程完成。
@Service
public class Serv1 interface ServInf {
@Async("customPool")
public CompletableFuture<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return CompletableFuture.completedFuture(/* calling an external RESTful API */);
}
}
@Service
public class Serv2 interface ServInf {
@Async("customPool")
public CompletableFuture<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return CompletableFuture.completedFuture(/* calling another external RESTful API */);
}
}
@RestController
public class MyController {
/** Typical service @Autowired's */
@GetMapping(/* ... */)
public WrapperObj getById(String id) {
CompletableFuture<List<String>> service1Result =
service1.getSomething(id)
.thenApply(result -> {
if (result == null) { return null; }
return result.stream().map(Obj::getName).collect(Collectors.toList());
})
.handle((result, exception) -> {
if (exception != null) {
// Call another asynchronous logging service which should be easy
return null;
} else {
return result;
}
});
CompletableFuture<List<String>> service2Result =
service2.getSomething(id)
.thenApply(result -> {
if (result == null) { return null; }
return result.stream().map(Obj::getName).collect(Collectors.toList());
})
.handle((result, exception) -> {
if (exception != null) {
// Call another asynchronous logging service which should be easy
return null;
} else {
return result;
}
});
// Blocking till we get the results from both services
List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();
/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
/** Signal that the API needs to fail as a whole */
throw new CustomException( /** where to get the messages? */);
}
/** merge and return the result */
}
}
我的问题是,由于这些服务 return 一些对象的列表,即使我使用 CompletableFuture.handle()
并检查是否存在异常,我也不能 return异常本身是为了捕获并让 Spring 建议 class 处理它(链接到 return 列表)。
我想到的一件事是使用 AtomicReference
来捕获异常并将它们设置在 handle()
中并在期货为 done/complete 时使用它们,例如
AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();
.handle((result, exception) -> {
if (exception != null) {
ce1.set(exception);
return null; // This signals that there was a failure
} else {
return result;
}
});
List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();
/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
/** Signal that the API needs to fail as a whole */
throw new CustomException(/** do logic to capture ce1.get().getMessage() + ce2.get().getMessage() */);
}
首先,这听起来像是多线程异步调用中的可行解决方案吗?
其次,这看起来很乱,所以我想知道是否有更优雅的方法在 Spring 异步池之外捕获这些异常,并在主线程中处理它,例如结合异常信息,抛给Spring Advice exception handler.
CompletableFutures 处理起来相当麻烦,但在我看来,这是一种更实用、反应更灵敏的方法。
我们需要 :
中的 sequence
方法
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
然后,我使用 Optional
来表示操作的状态,但是 更适合(所以如果您的代码库中有这样的实用程序,请使用一个 - Java 还没有自带):
CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
现在等待两个期货并在结果可用时处理:
CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
Optional<List<Object>> result1 = results.get(0);
Optional<List<Object>> result2 = results.get(1);
if (result1.isEmpty() && result2.isEmpty()) {
throw new CustomException(...);
}
//
return Stream.of(
result1.map(Stream::of).orElseGet(Stream::empty),
result2.map(Stream::of).orElseGet(Stream::empty)
).collect(Collectors.toList());
});
那你最好直接returnmergedResults
让框架给你处理,这样你就不会阻塞任何线程,或者你可以.get()
就可以了(这将阻塞线程),如果您的 CustomException
(或任何其他异常)被抛出(可在 e.getCause()
中访问),它将抛出 ExecutionException
。
如果您已经在使用 Project Reactor(或同等产品),这看起来会更简单,但想法大致相同。
假设有两个期货
CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …
一种straight-forward结合两种期货的方法是
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
但是如果任何一个 future 失败,这个 future 都会失败。
要仅在两个 futures 都失败时才失败并从两个 throwable 构造一个新的异常,我们可以定义两个实用方法:
private static Throwable getThrowable(CompletableFuture<?> f) {
return f.<Throwable>thenApply(value -> null)
.exceptionally(throwable -> throwable).join();
}
private static <T> T throwCustom(Throwable t1, Throwable t2) {
throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}
方法getThrowable
旨在用于已知异常完成的未来。我们可以调用 join
并捕获异常,但如上所示,我们还可以将 future 转换为包含 throwable 作为其值的 non-exceptional future。
然后,我们可以将以上所有内容结合起来
CompletableFuture<List<String>> failOnlyWhenBothFailed = both
.thenApply(list -> both)
.exceptionally(t ->
!service1Result.isCompletedExceptionally()? service1Result:
!service2Result.isCompletedExceptionally()? service2Result:
throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
.thenCompose(Function.identity());
在传递给 exceptionally
的函数中,传入的 futures 已知已完成,因此我们可以使用实用程序方法提取可抛出的对象并抛出新的异常。
这样做的好处是得到的构造是non-blocking.
但在你的情况下,你想等待完成而不是返回一个未来,所以我们可以简化操作:
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
both.exceptionally(t -> null).join();
if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}
List<String> result = (
service1Result.isCompletedExceptionally()? service2Result:
service2Result.isCompletedExceptionally()? service1Result: both
).join();
通过使用 both.exceptionally(t -> null).join();
,我们等待两个作业的完成,而不会在失败时抛出异常。在这条语句之后,我们可以安全地使用 isCompletedExceptionally()
来检查我们知道要完成的期货。
因此,如果两者都失败了,我们提取可抛出的对象并抛出我们的自定义异常,否则,我们检查哪些任务成功并提取其中一个或两个的结果。
仅仅因为我提出它是一种可能性,我想像这样的东西应该在使用 Project Reactor 的世界中起作用:
首先我们将服务修改为 return Mono
s,这很容易使用 Mono.fromFuture
(或者你可以将一个服务变成 Reactor 风格,如果它准备好了) :
@Service
public class Serv1 implements ServInf {
public Mono<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
//This Mono will either emit the result or complete with an error in case of Exception
}
}
//similar for Serv2
(反应性)端点可能如下所示(请参阅下面的编号评论):
public Mono<WrapperObj> getById(String id) {
WrapperObj wrapper = new WrapperObj(); //1
Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS1ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS2ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
return s1Mono
.zipWith(s2Mono) //6
.map(result ->
//transforms non-error results and merges them into the wrapper object
transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
)
.switchIfEmpty(Mono.just(wrapper)) //8
;
}
评论:
结果用于'accumulate'结果和Exceptions
在boundedElastic
线程池上调用服务,建议用于较长的IO任务。
将结果包装在Optional
中。我使用一个空的 Optional 作为错误完成的便利结果,因为 null
s 不能很好地通过 Reactor 传播。
如果服务调用抛出异常,我们可以在WrapperObj
上设置相应的错误结果。这类似于您对 AtomicReference
的使用,但没有创建额外的对象。
但是,这样的异常会导致 zipWith
(6) 失败,因此如果发生这种情况,我们将替换为 Optional.empty()
结果。
zipWith
创建两个结果的元组
我们处理这些结果,替换
剩下的就是对两个(non-exceptional)结果进行变换:
private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) {
//perform your result transformation and
//flesh out 'wrapper' with the results
//if there was an exception, the 'wrapper' contains the corresponding exception values
return wrapper;
}
我遇到以下情况,想看看是否有解决方案:
- 必须并行调用两个 Spring 服务(一个是现有服务 call/logic,第二个是新增服务)。
- 然后应合并结果并由 RESTful API return 编辑。
快乐之路应该是直截了当的,但是,当涉及到服务发出的错误时,应遵守以下规则:
API 只有在两个服务调用都失败时才会失败——这应该从主线程而不是
@Async
池中抛出,因为它们是独立的线程并且不会无法访问彼此的异常(至少这是我的推理)。如果其中只有一个失败,则通过另一个服务(异步)记录错误,并且 API returns 仅来自成功服务的结果 - - 这可以从各自的
@Async
线程完成。@Service public class Serv1 interface ServInf { @Async("customPool") public CompletableFuture<List<Obj>> getSomething(int id) { // The service ensures that the list is never null, but it can be empty return CompletableFuture.completedFuture(/* calling an external RESTful API */); } } @Service public class Serv2 interface ServInf { @Async("customPool") public CompletableFuture<List<Obj>> getSomething(int id) { // The service ensures that the list is never null, but it can be empty return CompletableFuture.completedFuture(/* calling another external RESTful API */); } } @RestController public class MyController { /** Typical service @Autowired's */ @GetMapping(/* ... */) public WrapperObj getById(String id) { CompletableFuture<List<String>> service1Result = service1.getSomething(id) .thenApply(result -> { if (result == null) { return null; } return result.stream().map(Obj::getName).collect(Collectors.toList()); }) .handle((result, exception) -> { if (exception != null) { // Call another asynchronous logging service which should be easy return null; } else { return result; } }); CompletableFuture<List<String>> service2Result = service2.getSomething(id) .thenApply(result -> { if (result == null) { return null; } return result.stream().map(Obj::getName).collect(Collectors.toList()); }) .handle((result, exception) -> { if (exception != null) { // Call another asynchronous logging service which should be easy return null; } else { return result; } }); // Blocking till we get the results from both services List<String> result1 = service1Result.get(); List<String> result2 = service2Result.get(); /** Where to get the exceptions thrown by the services if both fail if (result1 == null && result2 == null) { /** Signal that the API needs to fail as a whole */ throw new CustomException( /** where to get the messages? */); } /** merge and return the result */ } }
我的问题是,由于这些服务 return 一些对象的列表,即使我使用 CompletableFuture.handle()
并检查是否存在异常,我也不能 return异常本身是为了捕获并让 Spring 建议 class 处理它(链接到 return 列表)。
我想到的一件事是使用 AtomicReference
来捕获异常并将它们设置在 handle()
中并在期货为 done/complete 时使用它们,例如
AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();
.handle((result, exception) -> {
if (exception != null) {
ce1.set(exception);
return null; // This signals that there was a failure
} else {
return result;
}
});
List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();
/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
/** Signal that the API needs to fail as a whole */
throw new CustomException(/** do logic to capture ce1.get().getMessage() + ce2.get().getMessage() */);
}
首先,这听起来像是多线程异步调用中的可行解决方案吗?
其次,这看起来很乱,所以我想知道是否有更优雅的方法在 Spring 异步池之外捕获这些异常,并在主线程中处理它,例如结合异常信息,抛给Spring Advice exception handler.
CompletableFutures 处理起来相当麻烦,但在我看来,这是一种更实用、反应更灵敏的方法。
我们需要
sequence
方法
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
然后,我使用 Optional
来表示操作的状态,但是
CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
现在等待两个期货并在结果可用时处理:
CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
Optional<List<Object>> result1 = results.get(0);
Optional<List<Object>> result2 = results.get(1);
if (result1.isEmpty() && result2.isEmpty()) {
throw new CustomException(...);
}
//
return Stream.of(
result1.map(Stream::of).orElseGet(Stream::empty),
result2.map(Stream::of).orElseGet(Stream::empty)
).collect(Collectors.toList());
});
那你最好直接returnmergedResults
让框架给你处理,这样你就不会阻塞任何线程,或者你可以.get()
就可以了(这将阻塞线程),如果您的 CustomException
(或任何其他异常)被抛出(可在 e.getCause()
中访问),它将抛出 ExecutionException
。
如果您已经在使用 Project Reactor(或同等产品),这看起来会更简单,但想法大致相同。
假设有两个期货
CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …
一种straight-forward结合两种期货的方法是
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
但是如果任何一个 future 失败,这个 future 都会失败。
要仅在两个 futures 都失败时才失败并从两个 throwable 构造一个新的异常,我们可以定义两个实用方法:
private static Throwable getThrowable(CompletableFuture<?> f) {
return f.<Throwable>thenApply(value -> null)
.exceptionally(throwable -> throwable).join();
}
private static <T> T throwCustom(Throwable t1, Throwable t2) {
throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}
方法getThrowable
旨在用于已知异常完成的未来。我们可以调用 join
并捕获异常,但如上所示,我们还可以将 future 转换为包含 throwable 作为其值的 non-exceptional future。
然后,我们可以将以上所有内容结合起来
CompletableFuture<List<String>> failOnlyWhenBothFailed = both
.thenApply(list -> both)
.exceptionally(t ->
!service1Result.isCompletedExceptionally()? service1Result:
!service2Result.isCompletedExceptionally()? service2Result:
throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
.thenCompose(Function.identity());
在传递给 exceptionally
的函数中,传入的 futures 已知已完成,因此我们可以使用实用程序方法提取可抛出的对象并抛出新的异常。
这样做的好处是得到的构造是non-blocking.
但在你的情况下,你想等待完成而不是返回一个未来,所以我们可以简化操作:
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
both.exceptionally(t -> null).join();
if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
throw new CustomException(t1.getMessage() + " and " + t2.getMessage());
}
List<String> result = (
service1Result.isCompletedExceptionally()? service2Result:
service2Result.isCompletedExceptionally()? service1Result: both
).join();
通过使用 both.exceptionally(t -> null).join();
,我们等待两个作业的完成,而不会在失败时抛出异常。在这条语句之后,我们可以安全地使用 isCompletedExceptionally()
来检查我们知道要完成的期货。
因此,如果两者都失败了,我们提取可抛出的对象并抛出我们的自定义异常,否则,我们检查哪些任务成功并提取其中一个或两个的结果。
仅仅因为我提出它是一种可能性,我想像这样的东西应该在使用 Project Reactor 的世界中起作用:
首先我们将服务修改为 return Mono
s,这很容易使用 Mono.fromFuture
(或者你可以将一个服务变成 Reactor 风格,如果它准备好了) :
@Service
public class Serv1 implements ServInf {
public Mono<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
//This Mono will either emit the result or complete with an error in case of Exception
}
}
//similar for Serv2
(反应性)端点可能如下所示(请参阅下面的编号评论):
public Mono<WrapperObj> getById(String id) {
WrapperObj wrapper = new WrapperObj(); //1
Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS1ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS2ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
return s1Mono
.zipWith(s2Mono) //6
.map(result ->
//transforms non-error results and merges them into the wrapper object
transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
)
.switchIfEmpty(Mono.just(wrapper)) //8
;
}
评论:
结果用于'accumulate'结果和Exceptions
在
boundedElastic
线程池上调用服务,建议用于较长的IO任务。将结果包装在
Optional
中。我使用一个空的 Optional 作为错误完成的便利结果,因为null
s 不能很好地通过 Reactor 传播。如果服务调用抛出异常,我们可以在
WrapperObj
上设置相应的错误结果。这类似于您对AtomicReference
的使用,但没有创建额外的对象。但是,这样的异常会导致
zipWith
(6) 失败,因此如果发生这种情况,我们将替换为Optional.empty()
结果。zipWith
创建两个结果的元组我们处理这些结果,替换
剩下的就是对两个(non-exceptional)结果进行变换:
private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) { //perform your result transformation and //flesh out 'wrapper' with the results //if there was an exception, the 'wrapper' contains the corresponding exception values return wrapper; }