如何处理 Mono.fromCompletionStage 的异常
How to handle exceptions for Mono.fromCompletionStage
我需要将外部库返回的 CompletionStage 转换为反应管道内的 Mono。我如何处理调用返回的异常(我想忽略它们并继续序列)?当外部调用抛出异常时,不会调用 onErrorResume / doOn* 运算符(可能是因为异常从未创建 Mono)。
private void example() {
Flux.range(1, 2)
.flatMap(i ->
Mono.fromCompletionStage(externalCall(i))
.doOnNext(ni -> System.out.println("onNext: " + ni))
.doOnError(err -> System.err.println("onError: " + err.getMessage()))
.onErrorResume(e -> Mono.empty())
)
.subscribe();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompletionStage<String> externalCall(int i) {
if (new Random().nextBoolean()){
throw new RuntimeException("Exception in external call");
}
return Mono.just(i)
.map(e -> String.valueOf((char) (e + 64)))
.toFuture();
}
堆栈跟踪
2021-10-11T02:16:09,944 main r.c.p.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception in external call
Caused by: java.lang.RuntimeException: Exception in external call
at lrn.chap.SubscribingInFlatMap.externalCall(SubscribingInFlatMap.java:107)
at lrn.chap.SubscribingInFlatMap.lambda$example(SubscribingInFlatMap.java:91)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8641)
at reactor.core.publisher.Flux.subscribe(Flux.java:8438)
Process finished with exit code 0
您可以使用 Mono.defer()
将其转换为像这样的惰性发布者:
Mono.defer(() -> Mono.fromCompletionStage(externalCall(i)))
.doOnNext(ni -> System.out.println("onNext: " + ni))
.doOnError(err -> System.err.println("onError: " + err.getMessage()))
.onErrorResume(e -> Mono.empty()))
另一个答案中提到的 defer
方法是一个不错的选择,但还有另一个更短的选择,它使用 fromCompletionStage
方法的 lambda 版本:
Mono.fromCompletionStage(() -> externalCall(i))
.doOnNext(ni -> System.out.println("onNext: " + ni))
.doOnError(err -> System.err.println("onError: " + err.getMessage()))
.onErrorResume(e -> Mono.empty()))
就像 defer
一样,这将确保按需调用 externalCall
方法并将错误作为反应链的一部分进行处理。
我需要将外部库返回的 CompletionStage 转换为反应管道内的 Mono。我如何处理调用返回的异常(我想忽略它们并继续序列)?当外部调用抛出异常时,不会调用 onErrorResume / doOn* 运算符(可能是因为异常从未创建 Mono)。
private void example() {
Flux.range(1, 2)
.flatMap(i ->
Mono.fromCompletionStage(externalCall(i))
.doOnNext(ni -> System.out.println("onNext: " + ni))
.doOnError(err -> System.err.println("onError: " + err.getMessage()))
.onErrorResume(e -> Mono.empty())
)
.subscribe();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
CompletionStage<String> externalCall(int i) {
if (new Random().nextBoolean()){
throw new RuntimeException("Exception in external call");
}
return Mono.just(i)
.map(e -> String.valueOf((char) (e + 64)))
.toFuture();
}
堆栈跟踪
2021-10-11T02:16:09,944 main r.c.p.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception in external call
Caused by: java.lang.RuntimeException: Exception in external call
at lrn.chap.SubscribingInFlatMap.externalCall(SubscribingInFlatMap.java:107)
at lrn.chap.SubscribingInFlatMap.lambda$example(SubscribingInFlatMap.java:91)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8641)
at reactor.core.publisher.Flux.subscribe(Flux.java:8438)
Process finished with exit code 0
您可以使用 Mono.defer()
将其转换为像这样的惰性发布者:
Mono.defer(() -> Mono.fromCompletionStage(externalCall(i)))
.doOnNext(ni -> System.out.println("onNext: " + ni))
.doOnError(err -> System.err.println("onError: " + err.getMessage()))
.onErrorResume(e -> Mono.empty()))
另一个答案中提到的 defer
方法是一个不错的选择,但还有另一个更短的选择,它使用 fromCompletionStage
方法的 lambda 版本:
Mono.fromCompletionStage(() -> externalCall(i))
.doOnNext(ni -> System.out.println("onNext: " + ni))
.doOnError(err -> System.err.println("onError: " + err.getMessage()))
.onErrorResume(e -> Mono.empty()))
就像 defer
一样,这将确保按需调用 externalCall
方法并将错误作为反应链的一部分进行处理。