Spring Reactor onErrorContinue 不工作

Spring Reactor onErrorContinue not working

根据 documentation 我预计 onErrorContinue 将忽略错误元素并继续序列。以下测试用例失败并出现异常

java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:

@Test
public void testOnErrorContinue() throws InterruptedException {
    Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
            .concatWith(Flux.error(new RuntimeException("Test")))
            .concatWith(Flux.just(6))
            .map(i->i*2)
            .onErrorContinue((e,i)->{
                System.out.println("Error For Item +" + i );
            })
            ;
    StepVerifier
            .create(fluxFromJust)
            .expectNext(2, 4,6,8,10)
            .expectNext(12)
            .verifyComplete();
}

onErrorContinue() 可能并没有按照您的想法行事 - 它允许上游操作员 从他们内部可能发生的错误中恢复 ,如果他们碰巧支持这样做的话.是比较专业的运营商。

在这种情况下 map() 实际上支持 onErrorContinue,但是 map 实际上并没有产生错误——错误已经被插入到流中(通过 concat()和显式 Flux.error() 调用。)换句话说,根本没有运算符产生错误,因此它无法从中恢复,因为提供的元素是错误的。

如果您更改了流,使得 map() 实际上 导致了 错误,那么它将按预期工作:

Flux.just(1, 2,3,4,5)
        .map(x -> {
            if(x==5) {
                throw new RuntimeException();
            }
            return x*2;
        })
        .onErrorContinue((e,i)->{
            System.out.println("Error For Item +" + i );
        })
        .subscribe(System.out::println);

产生:

2
4
6
8
Error For Item +5

根据实际用例的替代方法可能是在可能错误的元素(或元素源)之后使用 onErrorResume()

Flux.just(1, 2, 3, 4, 5)
        .concatWith(Flux.error(new RuntimeException()))
        .onErrorResume(e -> {
            System.out.println("Error " + e + ", ignoring");
            return Mono.empty();
        })
        .concatWith(Flux.just(6))
        .map(i -> i * 2)
        .subscribe(System.out::println);

一般来说,使用另一个 "onError" 运算符(例如 onErrorResume())通常是更常用和更推荐的方法,因为 onErrorContinue() 依赖于运算符支持并影响上游,而不是下游运营商(这很不寻常。)