Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作
Project Reactor + flatMap + Multiple onErrorComplete - Not working as expected
当多个 onErrorContinue added to the pipeline to handle specific type of exception thrown from flatMap 时,异常处理未按预期工作。
我预计,在下面的代码中,元素 1 到 6 应该被删除,元素 7 到 10 应该被订阅者使用。
public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))
.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))
.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))
.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}
public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}
public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}
这是我得到的输出:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6
问题:为什么第2个onErrorContinue
没有被调用,异常发送给订阅者?
补充说明:
如果我删除 1st 和 2nd onErrorContinue
,那么所有异常都由 3rd onErrorContinue
处理。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更干净的异常处理而不是添加 if..else
块。
这个问题与
有何不同
1)这个关于异常处理的问题,以及异常处理的顺序;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成
3)这个问题与线程无关,即使在. subscribe
之后添加Thread.sleep(10000)
,行为也没有变化。
这又归结为 onErrorContinue
的异常行为。它打破了规则,因为它不会 "catch" 错误然后改变下游的行为,它实际上允许支持操作员 "look ahead" 并相应地表现,从而改变结果 上游.
这很奇怪,会导致一些不是很明显的行为,例如这里的情况。据我所知,所有支持运算符只向前看 next onErrorContinue
运算符,而不是递归地向前搜索所有此类运算符。相反,它们将评估下一个 onErrorContinue
的谓词(在本例中它是否属于某种类型),然后相应地进行操作——如果谓词 return 为真则调用处理程序,或者抛出如果不是,下游错误。 (在任何情况下它都会移动到 next onErrorContinue
运算符,然后是下一个,直到匹配谓词。)
显然这是一个人为的例子 - 但由于这些特质,我几乎总是建议避免 onErrorContinue
。涉及 flatMap()
的地方有两种 "normal" 方式:
如果 flatMap()
中有一个 "inner reactive chain",那就是它调用了 return 发布者的另一个方法或一系列方法 - 然后只需使用 onErrorResume()
在 flatMap()
调用结束时处理这些错误。您 可以 链接 onErrorResume()
因为它与下游而不是上游运营商合作。这是迄今为止最常见的情况。
If flatMap()
是 if / else 的命令式集合,它是 returning 不同的发布者,例如它在这里并且你想要/必须保持命令式风格,抛出异常而不是使用 Mono.error()
,并适当地捕获,returning Mono.empty()
以防出现错误:
.flatMap(number -> {
try {
if (number <= 3) {
throw new NumberLessThanThree();
} else if (number <= 6) {
throw new NumberLessThanSixButGreaterThan3();
} else {
return Mono.just(number);
}
}
catch(NumberLessThanThree ex) {
//Handle it
return Mono.empty();
}
catch(NumberLessThanSixButGreaterThan3 ex) {
//As above
}
})
一般来说,使用这两种方法中的一种会使更更容易推断出正在发生的事情。
(为了阅读评论的完整性 - 这与反应链无法在主线程退出之前完成没有任何关系。)
当多个 onErrorContinue added to the pipeline to handle specific type of exception thrown from flatMap 时,异常处理未按预期工作。
我预计,在下面的代码中,元素 1 到 6 应该被删除,元素 7 到 10 应该被订阅者使用。
public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))
.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))
.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))
.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}
public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}
public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}
这是我得到的输出:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6
问题:为什么第2个onErrorContinue
没有被调用,异常发送给订阅者?
补充说明:
如果我删除 1st 和 2nd onErrorContinue
,那么所有异常都由 3rd onErrorContinue
处理。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更干净的异常处理而不是添加 if..else
块。
这个问题与
1)这个关于异常处理的问题,以及异常处理的顺序;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成
3)这个问题与线程无关,即使在. subscribe
之后添加Thread.sleep(10000)
,行为也没有变化。
这又归结为 onErrorContinue
的异常行为。它打破了规则,因为它不会 "catch" 错误然后改变下游的行为,它实际上允许支持操作员 "look ahead" 并相应地表现,从而改变结果 上游.
这很奇怪,会导致一些不是很明显的行为,例如这里的情况。据我所知,所有支持运算符只向前看 next onErrorContinue
运算符,而不是递归地向前搜索所有此类运算符。相反,它们将评估下一个 onErrorContinue
的谓词(在本例中它是否属于某种类型),然后相应地进行操作——如果谓词 return 为真则调用处理程序,或者抛出如果不是,下游错误。 (在任何情况下它都会移动到 next onErrorContinue
运算符,然后是下一个,直到匹配谓词。)
显然这是一个人为的例子 - 但由于这些特质,我几乎总是建议避免 onErrorContinue
。涉及 flatMap()
的地方有两种 "normal" 方式:
如果
flatMap()
中有一个 "inner reactive chain",那就是它调用了 return 发布者的另一个方法或一系列方法 - 然后只需使用onErrorResume()
在flatMap()
调用结束时处理这些错误。您 可以 链接onErrorResume()
因为它与下游而不是上游运营商合作。这是迄今为止最常见的情况。If
flatMap()
是 if / else 的命令式集合,它是 returning 不同的发布者,例如它在这里并且你想要/必须保持命令式风格,抛出异常而不是使用Mono.error()
,并适当地捕获,returningMono.empty()
以防出现错误:
.flatMap(number -> {
try {
if (number <= 3) {
throw new NumberLessThanThree();
} else if (number <= 6) {
throw new NumberLessThanSixButGreaterThan3();
} else {
return Mono.just(number);
}
}
catch(NumberLessThanThree ex) {
//Handle it
return Mono.empty();
}
catch(NumberLessThanSixButGreaterThan3 ex) {
//As above
}
})
一般来说,使用这两种方法中的一种会使更更容易推断出正在发生的事情。
(为了阅读评论的完整性 - 这与反应链无法在主线程退出之前完成没有任何关系。)