如果通量为空,如何发出过滤掉的错误

How to emit filtered out error if flux is empty

我有同步代码,我想用 reactor 实现非阻塞。

我想并行调用不同的 URI,调用可以 return 一个响应,一个错误或者什么都没有。

有3个案例:

我已经以同步方式完成了此操作:

    AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    String responseBody = Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> {
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            })
            .blockFirst();
    
    return Pair.of(responseBody, responseException.get());

我想删除阻塞调用和 return 一个 Mono

据我了解,我在某种程度上保持了发生错误的状态,但我无法保持反应堆的状态。

我如何跟踪发生的错误但不发出它们,因为我想查看其他请求是否稍后发出结果?

这个版本有效吗?

AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    return Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> {
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            })
            .next()
            .switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));

AtomicReference 会像闭包一样被关闭吗?

我认为flatMapDelayError可能会实现你想要的,看这个例子:

int concurrency = 10;
int prefetch = 1;

Flux.just(
        Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
        Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
        Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
    .flatMapDelayError(
        request -> request,
        concurrency,
        prefetch)
    .next()
    .doOnNext(result -> System.out.println("Result: " + result))

在此示例中,error 首先完成,但 -DelayError 运算符持有它,然后 fast 完成并作为结果发出。最后 slow 永远不会完成,因为 .next() 因为我们有结果而取消了剩余的请求。