Spring Reactor onErrorContinue 不工作
Spring Reactor onErrorContinue not working
根据 documentation 我预计 onErrorContinue
将忽略错误元素并继续序列。以下测试用例失败并出现异常
java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:
@Test
public void testOnErrorContinue() throws InterruptedException {
Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.map(i->i*2)
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
;
StepVerifier
.create(fluxFromJust)
.expectNext(2, 4,6,8,10)
.expectNext(12)
.verifyComplete();
}
onErrorContinue()
可能并没有按照您的想法行事 - 它允许上游操作员 从他们内部可能发生的错误中恢复 ,如果他们碰巧支持这样做的话.是比较专业的运营商。
在这种情况下 map()
实际上支持 onErrorContinue
,但是 map 实际上并没有产生错误——错误已经被插入到流中(通过 concat()
和显式 Flux.error()
调用。)换句话说,根本没有运算符产生错误,因此它无法从中恢复,因为提供的元素是错误的。
如果您更改了流,使得 map()
实际上 导致了 错误,那么它将按预期工作:
Flux.just(1, 2,3,4,5)
.map(x -> {
if(x==5) {
throw new RuntimeException();
}
return x*2;
})
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
.subscribe(System.out::println);
产生:
2
4
6
8
Error For Item +5
根据实际用例的替代方法可能是在可能错误的元素(或元素源)之后使用 onErrorResume()
:
Flux.just(1, 2, 3, 4, 5)
.concatWith(Flux.error(new RuntimeException()))
.onErrorResume(e -> {
System.out.println("Error " + e + ", ignoring");
return Mono.empty();
})
.concatWith(Flux.just(6))
.map(i -> i * 2)
.subscribe(System.out::println);
一般来说,使用另一个 "onError" 运算符(例如 onErrorResume()
)通常是更常用和更推荐的方法,因为 onErrorContinue()
依赖于运算符支持并影响上游,而不是下游运营商(这很不寻常。)
根据 documentation 我预计 onErrorContinue
将忽略错误元素并继续序列。以下测试用例失败并出现异常
java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:
@Test
public void testOnErrorContinue() throws InterruptedException {
Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
.concatWith(Flux.error(new RuntimeException("Test")))
.concatWith(Flux.just(6))
.map(i->i*2)
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
;
StepVerifier
.create(fluxFromJust)
.expectNext(2, 4,6,8,10)
.expectNext(12)
.verifyComplete();
}
onErrorContinue()
可能并没有按照您的想法行事 - 它允许上游操作员 从他们内部可能发生的错误中恢复 ,如果他们碰巧支持这样做的话.是比较专业的运营商。
在这种情况下 map()
实际上支持 onErrorContinue
,但是 map 实际上并没有产生错误——错误已经被插入到流中(通过 concat()
和显式 Flux.error()
调用。)换句话说,根本没有运算符产生错误,因此它无法从中恢复,因为提供的元素是错误的。
如果您更改了流,使得 map()
实际上 导致了 错误,那么它将按预期工作:
Flux.just(1, 2,3,4,5)
.map(x -> {
if(x==5) {
throw new RuntimeException();
}
return x*2;
})
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
.subscribe(System.out::println);
产生:
2
4
6
8
Error For Item +5
根据实际用例的替代方法可能是在可能错误的元素(或元素源)之后使用 onErrorResume()
:
Flux.just(1, 2, 3, 4, 5)
.concatWith(Flux.error(new RuntimeException()))
.onErrorResume(e -> {
System.out.println("Error " + e + ", ignoring");
return Mono.empty();
})
.concatWith(Flux.just(6))
.map(i -> i * 2)
.subscribe(System.out::println);
一般来说,使用另一个 "onError" 运算符(例如 onErrorResume()
)通常是更常用和更推荐的方法,因为 onErrorContinue()
依赖于运算符支持并影响上游,而不是下游运营商(这很不寻常。)