Reactor onErrorContinue 操作符是否让原始序列继续?
Does Reactor onErrorContinue operator let the original sequence continue?
Reactor 错误处理文档 (https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling) 指出错误处理运算符不会让原始序列继续。
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
但是 onErrorContinue 的 javadoc 声明如下 (https://projectreactor.io/docs/core/3.4.10/api/index.html) -
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.
onErrorContinue 不被视为“错误处理运算符”吗?
它似乎确实允许原始序列继续 -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
结果(丢弃 3 个但继续后续元素)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process finished with exit code 0
文档确实指出 onErrorContinue 依赖于操作员的支持。有没有其他方法可以让原始序列(来源 Flux)继续适用于所有操作员?我不希望在出现错误(onErrorResume 行为)时使用备用通量来替换我的源通量 - 我只想忽略问题元素并继续使用源通量。
编辑 1(我的用例)
我有一个 reactor kafka source flux & 我想无限地消耗它而不管错误。我使用的是 onErrorContinue,但根据收到的有关此 post 的反馈,我已将其替换为 onErrorResume。下面是我此时拥有的代码,但我不确定它是否适用于所有情况(通过“工作”,我从 kafka 连续流式传输,不管有任何错误)。有什么建议吗?
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processRequest(record.value())
.doOnNext(e -> record.receiverOffset().acknowledge())
.doOnError(e -> {
System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
record.receiverOffset().acknowledge();
})
.onErrorResume(e -> Mono.empty()))
.repeat(() -> true)
.retryWhen(Retry.indefinitely())
.doFinally(signalType -> {
//dont expect control to ever reach here
System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
})
.subscribe();
反应器遵循的反应流规范指出,流中的所有错误都是终端事件——这就是反应器错误处理文档的基础。为了处理错误,错误必须已经发生,并且根据规范,该错误必须是终止的。在所有符合规范的案例中(几乎所有总数案例)都是如此。
然而,onErrorContinue()
是一种相当 特殊的 类型的运算符。它是一个错误处理运算符,但它通过允许删除错误并继续流来打破反应式规范。在您希望连续处理、永不停止、带有错误边信道的情况下,它可能很有用。
也就是说,它有很多问题 - 不仅仅是它需要特定的操作员支持(因为完全符合反应流规范的操作员可能会完全无视 onErrorContinue()
但仍保持合规性),而且一大堆其他问题。如果您有兴趣阅读一些背景知识,我们中的一些人会讨论这些 here。将来可能会移至 unsafe()
分组或类似分组,但这是一个很难解决的问题。
话虽如此,核心建议是 Javadoc at the moment,即 而不是 在所有情况下使用 onErrorContinue()
,但非常具体的情况除外,而是在每个单独的发布者上使用 onErrorResume()
:
//Stream
.flatMap(id -> repository.retrieveById(id)
.doOnError(System.err::println)
.onErrorResume(e -> Mono.empty()))
这引入了更多的冗长和可能的小的性能损失(我没有验证过),但具有更清晰的行为优势,不会破坏反应流规范,并且不需要特定的操作员支持上班。这是我在几乎所有情况下都会推荐的 - 我个人觉得 onErrorContinue()
的微妙之处在大多数情况下太复杂而无法推理。
Reactor 错误处理文档 (https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling) 指出错误处理运算符不会让原始序列继续。
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
但是 onErrorContinue 的 javadoc 声明如下 (https://projectreactor.io/docs/core/3.4.10/api/index.html) -
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.
onErrorContinue 不被视为“错误处理运算符”吗?
它似乎确实允许原始序列继续 -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
结果(丢弃 3 个但继续后续元素)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process finished with exit code 0
文档确实指出 onErrorContinue 依赖于操作员的支持。有没有其他方法可以让原始序列(来源 Flux)继续适用于所有操作员?我不希望在出现错误(onErrorResume 行为)时使用备用通量来替换我的源通量 - 我只想忽略问题元素并继续使用源通量。
编辑 1(我的用例)
我有一个 reactor kafka source flux & 我想无限地消耗它而不管错误。我使用的是 onErrorContinue,但根据收到的有关此 post 的反馈,我已将其替换为 onErrorResume。下面是我此时拥有的代码,但我不确定它是否适用于所有情况(通过“工作”,我从 kafka 连续流式传输,不管有任何错误)。有什么建议吗?
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processRequest(record.value())
.doOnNext(e -> record.receiverOffset().acknowledge())
.doOnError(e -> {
System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
record.receiverOffset().acknowledge();
})
.onErrorResume(e -> Mono.empty()))
.repeat(() -> true)
.retryWhen(Retry.indefinitely())
.doFinally(signalType -> {
//dont expect control to ever reach here
System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
})
.subscribe();
反应器遵循的反应流规范指出,流中的所有错误都是终端事件——这就是反应器错误处理文档的基础。为了处理错误,错误必须已经发生,并且根据规范,该错误必须是终止的。在所有符合规范的案例中(几乎所有总数案例)都是如此。
然而,onErrorContinue()
是一种相当 特殊的 类型的运算符。它是一个错误处理运算符,但它通过允许删除错误并继续流来打破反应式规范。在您希望连续处理、永不停止、带有错误边信道的情况下,它可能很有用。
也就是说,它有很多问题 - 不仅仅是它需要特定的操作员支持(因为完全符合反应流规范的操作员可能会完全无视 onErrorContinue()
但仍保持合规性),而且一大堆其他问题。如果您有兴趣阅读一些背景知识,我们中的一些人会讨论这些 here。将来可能会移至 unsafe()
分组或类似分组,但这是一个很难解决的问题。
话虽如此,核心建议是 Javadoc at the moment,即 而不是 在所有情况下使用 onErrorContinue()
,但非常具体的情况除外,而是在每个单独的发布者上使用 onErrorResume()
:
//Stream
.flatMap(id -> repository.retrieveById(id)
.doOnError(System.err::println)
.onErrorResume(e -> Mono.empty()))
这引入了更多的冗长和可能的小的性能损失(我没有验证过),但具有更清晰的行为优势,不会破坏反应流规范,并且不需要特定的操作员支持上班。这是我在几乎所有情况下都会推荐的 - 我个人觉得 onErrorContinue()
的微妙之处在大多数情况下太复杂而无法推理。