FluxSink.next() throw error/exception,如果用户在处理器中抛出错误
FluxSink.next() throw error/exception, if error thrown in processor by user
Flux.<Integer>push(sink -> {
try {
for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
sink.next(i);
}
sink.complete();
} catch (Exception e) {
System.out.println("Error in wrong place " + e);
}
})
.doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
.subscribe(i -> System.out.println("i = " + i));
这段代码的输出是
`Error in wrong place reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error`
表示异常在push方法中被sink.next(i);
重新抛出,而不是推送到error reactive channel。为什么?
它被推送到flux error频道,但是你没有错误订阅者在这个频道,你可以添加:
.doOnError(e -> System.out.println("Now error in good place = " + e))
或
.subscribe(i -> System.out.println("i = " + i), e -> System.out.println("Now error in good place = " + e));
完整代码为:
Flux.<Integer>push(sink -> {
try {
for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
sink.next(i);
}
sink.complete();
} catch (Exception e) {
System.out.println("Error in wrong place " + e);
}
})
.doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
.doOnError(e -> System.out.println("Now error in good place = " + e)) //it's missing
.subscribe(i -> System.out.println("i = " + i));
另请参阅此答案,为什么您需要订户和标准 (fox next()) subscriber
不会引发错误:What is correct way to generate and handle exceptions when using Flux from projectreactor
Flux.<Integer>push(sink -> {
try {
for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
sink.next(i);
}
sink.complete();
} catch (Exception e) {
System.out.println("Error in wrong place " + e);
}
})
.doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
.subscribe(i -> System.out.println("i = " + i));
这段代码的输出是
`Error in wrong place reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error`
表示异常在push方法中被sink.next(i);
重新抛出,而不是推送到error reactive channel。为什么?
它被推送到flux error频道,但是你没有错误订阅者在这个频道,你可以添加:
.doOnError(e -> System.out.println("Now error in good place = " + e))
或
.subscribe(i -> System.out.println("i = " + i), e -> System.out.println("Now error in good place = " + e));
完整代码为:
Flux.<Integer>push(sink -> {
try {
for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
sink.next(i);
}
sink.complete();
} catch (Exception e) {
System.out.println("Error in wrong place " + e);
}
})
.doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
.doOnError(e -> System.out.println("Now error in good place = " + e)) //it's missing
.subscribe(i -> System.out.println("i = " + i));
另请参阅此答案,为什么您需要订户和标准 (fox next()) subscriber
不会引发错误:What is correct way to generate and handle exceptions when using Flux from projectreactor