如果通量为空,如何发出过滤掉的错误
How to emit filtered out error if flux is empty
我有同步代码,我想用 reactor 实现非阻塞。
我想并行调用不同的 URI,调用可以 return 一个响应,一个错误或者什么都没有。
有3个案例:
- 一个请求return是一个响应,我return它不需要等待其他请求完成。
如果其他请求 return 早些时候出错,我会删除错误
- 至少有一个请求 return 出错,没有其他请求 return 响应,我 return 出错
- 所有请求return什么都没有(没有响应,没有错误),我return什么都没有
我已经以同步方式完成了此操作:
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
String responseBody = Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.blockFirst();
return Pair.of(responseBody, responseException.get());
我想删除阻塞调用和 return 一个 Mono
据我了解,我在某种程度上保持了发生错误的状态,但我无法保持反应堆的状态。
我如何跟踪发生的错误但不发出它们,因为我想查看其他请求是否稍后发出结果?
这个版本有效吗?
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
return Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.next()
.switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
AtomicReference 会像闭包一样被关闭吗?
我认为flatMapDelayError
可能会实现你想要的,看这个例子:
int concurrency = 10;
int prefetch = 1;
Flux.just(
Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
.flatMapDelayError(
request -> request,
concurrency,
prefetch)
.next()
.doOnNext(result -> System.out.println("Result: " + result))
在此示例中,error
首先完成,但 -DelayError
运算符持有它,然后 fast
完成并作为结果发出。最后 slow
永远不会完成,因为 .next()
因为我们有结果而取消了剩余的请求。
我有同步代码,我想用 reactor 实现非阻塞。
我想并行调用不同的 URI,调用可以 return 一个响应,一个错误或者什么都没有。
有3个案例:
- 一个请求return是一个响应,我return它不需要等待其他请求完成。 如果其他请求 return 早些时候出错,我会删除错误
- 至少有一个请求 return 出错,没有其他请求 return 响应,我 return 出错
- 所有请求return什么都没有(没有响应,没有错误),我return什么都没有
我已经以同步方式完成了此操作:
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
String responseBody = Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.blockFirst();
return Pair.of(responseBody, responseException.get());
我想删除阻塞调用和 return 一个 Mono
据我了解,我在某种程度上保持了发生错误的状态,但我无法保持反应堆的状态。
我如何跟踪发生的错误但不发出它们,因为我想查看其他请求是否稍后发出结果?
这个版本有效吗?
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
return Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.next()
.switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
AtomicReference 会像闭包一样被关闭吗?
我认为flatMapDelayError
可能会实现你想要的,看这个例子:
int concurrency = 10;
int prefetch = 1;
Flux.just(
Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
.flatMapDelayError(
request -> request,
concurrency,
prefetch)
.next()
.doOnNext(result -> System.out.println("Result: " + result))
在此示例中,error
首先完成,但 -DelayError
运算符持有它,然后 fast
完成并作为结果发出。最后 slow
永远不会完成,因为 .next()
因为我们有结果而取消了剩余的请求。