Project Reactor + flatMap + Multiple onErrorComplete - 未按预期工作

Project Reactor + flatMap + Multiple onErrorComplete - Not working as expected

当多个 onErrorContinue added to the pipeline to handle specific type of exception thrown from flatMap 时,异常处理未按预期工作。

我预计,在下面的代码中,元素 1 到 6 应该被删除,元素 7 到 10 应该被订阅者使用。

public class FlatMapOnErrorContinueExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .flatMap(number -> {
                    if (number <= 3) {
                        return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
                    } else if (number > 3 && number <= 6) {
                        return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
                    } else {
                        return Mono.just(number);
                    }
                })
                .onErrorContinue(NumberLesserThanThree.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

                .onErrorContinue(NumberLesserThanSixButGretherThan3.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

                .onErrorContinue((throwable, object) ->
                        System.err.println("Exception: " + throwable.getMessage()))

                .subscribe(number -> System.out.println("number is " + number),
                        error -> System.err.println("Exception in Subscription " + error.getMessage()));
    }

    public static class NumberLesserThanThree extends RuntimeException {
        public NumberLesserThanThree(final String msg) {
            super(msg);
        }
    }

    public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
        public NumberLesserThanSixButGretherThan3(final String msg) {
            super(msg);
        }
    }
}

这是我得到的输出:

Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

问题:为什么第2个onErrorContinue没有被调用,异常发送给订阅者?

补充说明: 如果我删除 1st 和 2nd onErrorContinue,那么所有异常都由 3rd onErrorContinue 处理。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更干净的异常处理而不是添加 if..else 块。

这个问题与

有何不同

1)这个关于异常处理的问题,以及异常处理的顺序;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题与线程无关,即使在. subscribe之后添加Thread.sleep(10000),行为也没有变化。

这又归结为 onErrorContinue 的异常行为。它打破了规则,因为它不会 "catch" 错误然后改变下游的行为,它实际上允许支持操作员 "look ahead" 并相应地表现,从而改变结果 上游.

这很奇怪,会导致一些不是很明显的行为,例如这里的情况。据我所知,所有支持运算符只向前看 next onErrorContinue 运算符,而不是递归地向前搜索所有此类运算符。相反,它们将评估下一个 onErrorContinue 的谓词(在本例中它是否属于某种类型),然后相应地进行操作——如果谓词 return 为真则调用处理程序,或者抛出如果不是,下游错误。 (在任何情况下它都会移动到 next onErrorContinue 运算符,然后是下一个,直到匹配谓词。)

显然这是一个人为的例子 - 但由于这些特质,我几乎总是建议避免 onErrorContinue。涉及 flatMap() 的地方有两种 "normal" 方式:

  1. 如果 flatMap() 中有一个 "inner reactive chain",那就是它调用了 return 发布者的另一个方法或一系列方法 - 然后只需使用 onErrorResume()flatMap() 调用结束时处理这些错误。您 可以 链接 onErrorResume() 因为它与下游而不是上游运营商合作。这是迄今为止最常见的情况。

  2. If flatMap() 是 if / else 的命令式集合,它是 returning 不同的发布者,例如它在这里并且你想要/必须保持命令式风格,抛出异常而不是使用 Mono.error(),并适当地捕获,returning Mono.empty() 以防出现错误:

    .flatMap(number -> {
        try {
            if (number <= 3) {
                throw new NumberLessThanThree();
            } else if (number <= 6) {
                throw new NumberLessThanSixButGreaterThan3();
            } else {
                return Mono.just(number);
            }
        }
        catch(NumberLessThanThree ex) {
            //Handle it
            return Mono.empty();
        }
        catch(NumberLessThanSixButGreaterThan3 ex) {
            //As above
        }
    })

一般来说,使用这两种方法中的一种会使更容易推断出正在发生的事情。

(为了阅读评论的完整性 - 这与反应链无法在主线程退出之前完成没有任何关系。)