如何处理 Mono.fromCompletionStage 的异常

How to handle exceptions for Mono.fromCompletionStage

我需要将外部库返回的 CompletionStage 转换为反应管道内的 Mono。我如何处理调用返回的异常(我想忽略它们并继续序列)?当外部调用抛出异常时,不会调用 onErrorResume / doOn* 运算符(可能是因为异常从未创建 Mono)。

private void example() {
    Flux.range(1, 2)
            .flatMap(i ->
                    Mono.fromCompletionStage(externalCall(i))
                            .doOnNext(ni -> System.out.println("onNext: " + ni))
                            .doOnError(err -> System.err.println("onError: " + err.getMessage()))
                            .onErrorResume(e -> Mono.empty())
            )
            .subscribe();

    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

CompletionStage<String> externalCall(int i) {
    if (new Random().nextBoolean()){
        throw new RuntimeException("Exception in external call");
    }

    return Mono.just(i)
            .map(e -> String.valueOf((char) (e + 64)))
            .toFuture();
}

堆栈跟踪

2021-10-11T02:16:09,944 main r.c.p.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception in external call
Caused by: java.lang.RuntimeException: Exception in external call
    at lrn.chap.SubscribingInFlatMap.externalCall(SubscribingInFlatMap.java:107)
    at lrn.chap.SubscribingInFlatMap.lambda$example(SubscribingInFlatMap.java:91)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8468)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8641)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8438)

Process finished with exit code 0

您可以使用 Mono.defer() 将其转换为像这样的惰性发布者:

Mono.defer(() -> Mono.fromCompletionStage(externalCall(i)))
    .doOnNext(ni -> System.out.println("onNext: " + ni))
    .doOnError(err -> System.err.println("onError: " + err.getMessage()))
    .onErrorResume(e -> Mono.empty()))

另一个答案中提到的 defer 方法是一个不错的选择,但还有另一个更短的选择,它使用 fromCompletionStage 方法的 lambda 版本:

Mono.fromCompletionStage(() -> externalCall(i))
    .doOnNext(ni -> System.out.println("onNext: " + ni))
    .doOnError(err -> System.err.println("onError: " + err.getMessage()))
    .onErrorResume(e -> Mono.empty()))

就像 defer 一样,这将确保按需调用 externalCall 方法并将错误作为反应链的一部分进行处理。