FluxSink.next() throw error/exception,如果用户在处理器中抛出错误

FluxSink.next() throw error/exception, if error thrown in processor by user

   Flux.<Integer>push(sink -> {
        try {
            for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
                sink.next(i);
            }
            sink.complete();
        } catch (Exception e) {
            System.out.println("Error in wrong place " + e);
        }
    })
            .doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
            .subscribe(i -> System.out.println("i = " + i));

这段代码的输出是

  `Error in wrong place reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error` 

表示异常在push方法中被sink.next(i);重新抛出,而不是推送到error reactive channel。为什么?

它被推送到flux error频道,但是你没有错误订阅者在这个频道,你可以添加:

.doOnError(e -> System.out.println("Now error in good place = " + e))

.subscribe(i -> System.out.println("i = " + i), e -> System.out.println("Now error in good place = " + e));

完整代码为:

   Flux.<Integer>push(sink -> {
        try {
            for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
                sink.next(i);
            }
            sink.complete();
        } catch (Exception e) {
            System.out.println("Error in wrong place " + e);
        }
    })
            .doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
            .doOnError(e -> System.out.println("Now error in good place = " + e)) //it's missing
            .subscribe(i -> System.out.println("i = " + i));

另请参阅此答案,为什么您需要订户和标准 (fox next()) subscriber 不会引发错误:What is correct way to generate and handle exceptions when using Flux from projectreactor