Reactor 有没有办法忽略错误信号?
Is there a way in Reactor to ignore error signals?
我有一个包含多个 URL 和端口的数组。对于他们每个人,我都需要发送和接收一些东西:
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))
我与 UDP
中的服务器通信,所以我无法判断服务器是否处于活动状态,除非我发送一条“根据某些规则集需要响应”的消息
ConnectToTracker.connect
可能会在服务器响应时发送 onNext
信号,或者在服务器未响应(SocketTimeOutException
)或任何其他情况下发送 onError
信号其他故障(一般IOException
)。
我不想终止 flux
,例如,如果 onError
信号等于 SocketTimeOutException
。 相反,我想尝试与我得到的每个跟踪器进行通信。
This link 包含我可以用来处理错误的所有操作 但不能忽略它们。
如果这很重要,我正在使用 Reactor 3
。
更新:
我做了一个丑陋的技巧,但有效:
Flux.fromArray(trackersArray)
.handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.subscribe(sink::next, error -> {
if (!(error instanceof SocketTimeoutException))
sink.error(error);
}, sink::complete);
})
有什么更好的请随意填写回答
由于您已经在平面图中处理 url,因此请使用 onErrorResume(e -> Mono.empty())
。这将使 flatmap 忽略错误。
编辑:在平面图中,在 lambda
的右侧
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))
也许这样做更好,它会从 SocketTimeOut 恢复,如果异常是其他的,我会选择 onError
现在我们在 3.3.2
版本中有 reactor.core.publisher.onErrorContinue()
,它允许您在某些元素为 onError()
时发送 onNext()
信号。使用log()
你会看得更清楚。
签名是 (throwable, instance)
所以如果你想记录错误的签名,这很有用。
Flux.fromIterable(aList)
.flatMap(this::xxxx)
.onErrorContinue((throwable, o) -> {
log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
....
我有一个包含多个 URL 和端口的数组。对于他们每个人,我都需要发送和接收一些东西:
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))
我与 UDP
中的服务器通信,所以我无法判断服务器是否处于活动状态,除非我发送一条“根据某些规则集需要响应”的消息
ConnectToTracker.connect
可能会在服务器响应时发送 onNext
信号,或者在服务器未响应(SocketTimeOutException
)或任何其他情况下发送 onError
信号其他故障(一般IOException
)。
我不想终止 flux
,例如,如果 onError
信号等于 SocketTimeOutException
。 相反,我想尝试与我得到的每个跟踪器进行通信。
This link 包含我可以用来处理错误的所有操作 但不能忽略它们。
如果这很重要,我正在使用 Reactor 3
。
更新:
我做了一个丑陋的技巧,但有效:
Flux.fromArray(trackersArray)
.handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.subscribe(sink::next, error -> {
if (!(error instanceof SocketTimeoutException))
sink.error(error);
}, sink::complete);
})
有什么更好的请随意填写回答
由于您已经在平面图中处理 url,因此请使用 onErrorResume(e -> Mono.empty())
。这将使 flatmap 忽略错误。
编辑:在平面图中,在 lambda
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))
也许这样做更好,它会从 SocketTimeOut 恢复,如果异常是其他的,我会选择 onError
现在我们在 3.3.2
版本中有 reactor.core.publisher.onErrorContinue()
,它允许您在某些元素为 onError()
时发送 onNext()
信号。使用log()
你会看得更清楚。
签名是 (throwable, instance)
所以如果你想记录错误的签名,这很有用。
Flux.fromIterable(aList)
.flatMap(this::xxxx)
.onErrorContinue((throwable, o) -> {
log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
....