WebFlux:onErrorContinue 不继续

WebFlux: onErrorContinue does not continue

我有以下流量:

Flux.generate(() -> 1, (n, sink) -> {
            if(n==10){
                sink.complete();
            }
            if(n==2) throw new RuntimeException("some error");
            sink.next(n);
            return n+1;
        })
        .onErrorContinue(
                RuntimeException.class,
                (e, o) -> System.out.println("ERROR")
        )
        .subscribe(System.out::println);

我想获得 1, "ERROR", 3, 4 ... 的输出,但我无法继续,而 flux 只执行 1 并以

终止

[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: some error

有谁知道如何跳过这些错误信号并继续执行通量?

您似乎无法对 Flux::generate 代码抛出异常。

Flux.<Integer, Integer>generate(() -> 1, (n, sink) -> {
            if (n == 10) {
                sink.complete();
            }
            sink.next(n);
            return n + 1;
        })
        .doOnNext(n -> {
            if (n == 2) throw new RuntimeException("some error");
        })
        .onErrorContinue(
                RuntimeException.class,
                (e, o) -> System.out.println("ERROR")
        )
        .subscribe(System.out::println);

onErrorContinue() 是一个非常特殊的运算符,仅在特定运算符上受支持。

查看Reactor documentation了解详情

Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)

在大多数情况下,更好地使用 onErrorResume() 和 return Mono.empty() 来实现类似的行为。

Flux.<Integer, Integer>generate(() -> 1, (n, sink) -> {
        if (n == 10) {
            sink.complete();
        }
        sink.next(n);
        return n + 1;
    })
    .doOnNext(n -> {
        if (n == 2) throw new RuntimeException("some error");
    })
    .onErrorResume(RuntimeException.class, e -> {
        System.out.println("ERROR");
        return Mono.empty();
    })
    .subscribe(System.out::println);