Reactor 有没有办法忽略错误信号?

Is there a way in Reactor to ignore error signals?

我有一个包含多个 URL 和端口的数组。对于他们每个人,我都需要发送和接收一些东西:

Flux.fromArray(trackersArray)
    .flatMap(tracker -> 
               ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))

我与 UDP 中的服务器通信,所以我无法判断服务器是否处于活动状态,除非我发送一条“根据某些规则集需要响应”的消息

ConnectToTracker.connect 可能会在服务器响应时发送 onNext 信号,或者在服务器未响应(SocketTimeOutException)或任何其他情况下发送 onError 信号其他故障(一般IOException)。

我不想终止 flux,例如,如果 onError 信号等于 SocketTimeOutException相反,我想尝试与我得到的每个跟踪器进行通信。

This link 包含我可以用来处理错误的所有操作 但不能忽略它们。

如果这很重要,我正在使用 Reactor 3

更新:

我做了一个丑陋的技巧,但有效:

Flux.fromArray(trackersArray)
        .handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
            ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                    .subscribe(sink::next, error -> {
                        if (!(error instanceof SocketTimeoutException))
                            sink.error(error);
                    }, sink::complete);
        })

有什么更好的请随意填写回答

由于您已经在平面图中处理 url,因此请使用 onErrorResume(e -> Mono.empty())。这将使 flatmap 忽略错误。 编辑:在平面图中,在 lambda

的右侧
Flux.fromArray(trackersArray)
.flatMap(tracker -> 
           ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
                .onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))

也许这样做更好,它会从 SocketTimeOut 恢复,如果异常是其他的,我会选择 onError

现在我们在 3.3.2 版本中有 reactor.core.publisher.onErrorContinue(),它允许您在某些元素为 onError() 时发送 onNext() 信号。使用log()你会看得更清楚。

签名是 (throwable, instance) 所以如果你想记录错误的签名,这很有用。

Flux.fromIterable(aList)
    .flatMap(this::xxxx)
    .onErrorContinue((throwable, o) -> {
        log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
    ....