如何使用 Spring WebClient 进行非阻塞调用并在所有调用完成后发送电子邮件?
How to use Spring WebClient to make non-blocking calls and send email after all calls complete?
我正在使用 Spring 的“WebClient”和项目反应器对 URL 的列表进行非阻塞调用。我的要求是:
- 在 URL 个
的列表上异步调用 GET
- 在调用每个 URL 时记录 URL
- 记录导致异常的调用的 URL
- 记录成功调用的 URL
- 记录导致非 2xx HTTP 状态的调用的 URL
- 发送包含 URL 列表的电子邮件,其中调用导致异常或非 2xx HTTP 状态
这是我的尝试:
List<Mono<ClientResponse>> restCalls = new ArrayList<>();
List<String> failedUrls = new ArrayList<>();
for (String serviceUrl : serviceUrls.getServiceUrls()) {
restCalls.add(
webClientBuilder
.build()
.get()
.uri(serviceUrl)
.exchange()
.doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
.doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
.doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
}
Flux.fromIterable(restCalls)
.map((data) -> data.subscribe())
.onErrorContinue((throwable, e) -> {
log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
failedUrls.add(serviceUrl);
})
.collectList()
.subscribe((data) -> {
log.info("all called");
email.send("Failed URLs are {}", failedUrls);
});
问题是在呼叫响应之前发送了电子邮件。我如何才能等到 URL 的所有调用都完成后再调用 email.send
?
我还没有测试过,但这应该可以工作
public void check() {
List<Flux<String>> restCalls = new ArrayList<>();
for (String serviceUrl : serviceUrls.getServiceUrls()) {
restCalls.add(rest.getForEntity(serviceUrl, String.class));
}
Flux.fromIterable(restCalls)
.map((data) -> data.blockFirst())
.onErrorContinue((throwable, e) -> {
((WebClientResponseException) throwable).getRequest().getURI(); // get the failing URI
// do whatever you need with the failed service
})
.collectList() // Collects all the results into a list
.subscribe((data) -> {
// from here do whatever is needed from the results
});
}
所以如果你没有这样做,你的服务调用必须是非阻塞的,所以你应该把类型变成Flux。
所以在你的 restService 里面你的方法应该是这样的
public Flux<String> getForEntity(String name) {
return this.webClient.get().uri("url", name)
.retrieve().bodyToFlux(String.class);
}
希望对大家有所帮助
restCalls.add(
webClientBuilder
.build()
.get()
.uri(serviceUrl)
.exchange()
.doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
.doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
.doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
Flux.fromIterable(restCalls)
.map((data) -> data.subscribe())
.onErrorContinue((throwable, e) -> {
log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
failedUrls.add(serviceUrl);
})
.collectList()
.subscribeOn(Schedulers.elastic())
.subscribe((data) -> {
log.info("all called");
email.send("Failed URLs are {}", failedUrls);
});
如评论中所述,您的示例中的主要错误是使用 'subscribe',它启动查询,但在独立于主要流量的上下文中,因此您无法取回错误或结果。
订阅是管道上的一种触发操作,它不用于链接。
这是一个完整的示例(电子邮件除外,被日志记录取代):
package fr.amanin.Whosebug;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
public class WebfluxURLProcessing {
private static final Logger LOGGER = Logger.getLogger("example");
public static void main(String[] args) {
final List<String> urls = Arrays.asList("https://www.google.com", "https://kotlinlang.org/kotlin/is/wonderful/", "https://whosebug.com", "http://doNotexists.blabla");
final Flux<ExchangeDetails> events = Flux.fromIterable(urls)
// unwrap request async operations
.flatMap(url -> request(url))
// Add a side-effect to log results
.doOnNext(details -> log(details))
// Keep only results that show an error
.filter(details -> details.status < 0 || !HttpStatus.valueOf(details.status).is2xxSuccessful());
sendEmail(events);
}
/**
* Mock emails by collecting all events in a text and logging it.
* @param report asynchronous flow of responses
*/
private static void sendEmail(Flux<ExchangeDetails> report) {
final String formattedReport = report
.map(details -> String.format("Error on %s. status: %d. Reason: %s", details.url, details.status, details.error.getMessage()))
// collecting (or reducing, folding, etc.) allows to gather all upstream results to use them as a single value downstream.
.collect(Collectors.joining(System.lineSeparator(), "REPORT:"+System.lineSeparator(), ""))
// In a real-world scenario, replace this with a subscribe or chaining to another reactive operation.
.block();
LOGGER.info(formattedReport);
}
private static void log(ExchangeDetails details) {
if (details.status >= 0 && HttpStatus.valueOf(details.status).is2xxSuccessful()) {
LOGGER.info("Success on: "+details.url);
} else {
LOGGER.log(Level.WARNING,
"Status {0} on {1}. Reason: {2}",
new Object[]{
details.status,
details.url,
details.error == null ? "None" : details.error.getMessage()
});
}
}
private static Mono<ExchangeDetails> request(String url) {
return WebClient.create(url).get()
.retrieve()
// workaround to counter fail-fast behavior: create a special error that will be converted back to a result
.onStatus(status -> !status.is2xxSuccessful(), cr -> cr.createException().map(err -> new RequestException(cr.statusCode(), err)))
.toBodilessEntity()
.map(response -> new ExchangeDetails(url, response.getStatusCode().value(), null))
// Convert back custom error to result
.onErrorResume(RequestException.class, err -> Mono.just(new ExchangeDetails(url, err.status.value(), err.cause)))
// Convert errors that shut connection before server response (cannot connect, etc.) to a result
.onErrorResume(Exception.class, err -> Mono.just(new ExchangeDetails(url, -1, err)));
}
public static class ExchangeDetails {
final String url;
final int status;
final Exception error;
public ExchangeDetails(String url, int status, Exception error) {
this.url = url;
this.status = status;
this.error = error;
}
}
private static class RequestException extends RuntimeException {
final HttpStatus status;
final Exception cause;
public RequestException(HttpStatus status, Exception cause) {
this.status = status;
this.cause = cause;
}
}
}
我正在使用 Spring 的“WebClient”和项目反应器对 URL 的列表进行非阻塞调用。我的要求是:
- 在 URL 个 的列表上异步调用 GET
- 在调用每个 URL 时记录 URL
- 记录导致异常的调用的 URL
- 记录成功调用的 URL
- 记录导致非 2xx HTTP 状态的调用的 URL
- 发送包含 URL 列表的电子邮件,其中调用导致异常或非 2xx HTTP 状态
这是我的尝试:
List<Mono<ClientResponse>> restCalls = new ArrayList<>();
List<String> failedUrls = new ArrayList<>();
for (String serviceUrl : serviceUrls.getServiceUrls()) {
restCalls.add(
webClientBuilder
.build()
.get()
.uri(serviceUrl)
.exchange()
.doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
.doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
.doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
}
Flux.fromIterable(restCalls)
.map((data) -> data.subscribe())
.onErrorContinue((throwable, e) -> {
log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
failedUrls.add(serviceUrl);
})
.collectList()
.subscribe((data) -> {
log.info("all called");
email.send("Failed URLs are {}", failedUrls);
});
问题是在呼叫响应之前发送了电子邮件。我如何才能等到 URL 的所有调用都完成后再调用 email.send
?
我还没有测试过,但这应该可以工作
public void check() {
List<Flux<String>> restCalls = new ArrayList<>();
for (String serviceUrl : serviceUrls.getServiceUrls()) {
restCalls.add(rest.getForEntity(serviceUrl, String.class));
}
Flux.fromIterable(restCalls)
.map((data) -> data.blockFirst())
.onErrorContinue((throwable, e) -> {
((WebClientResponseException) throwable).getRequest().getURI(); // get the failing URI
// do whatever you need with the failed service
})
.collectList() // Collects all the results into a list
.subscribe((data) -> {
// from here do whatever is needed from the results
});
}
所以如果你没有这样做,你的服务调用必须是非阻塞的,所以你应该把类型变成Flux。 所以在你的 restService 里面你的方法应该是这样的
public Flux<String> getForEntity(String name) {
return this.webClient.get().uri("url", name)
.retrieve().bodyToFlux(String.class);
}
希望对大家有所帮助
restCalls.add(
webClientBuilder
.build()
.get()
.uri(serviceUrl)
.exchange()
.doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
.doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
.doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
Flux.fromIterable(restCalls)
.map((data) -> data.subscribe())
.onErrorContinue((throwable, e) -> {
log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
failedUrls.add(serviceUrl);
})
.collectList()
.subscribeOn(Schedulers.elastic())
.subscribe((data) -> {
log.info("all called");
email.send("Failed URLs are {}", failedUrls);
});
如评论中所述,您的示例中的主要错误是使用 'subscribe',它启动查询,但在独立于主要流量的上下文中,因此您无法取回错误或结果。
订阅是管道上的一种触发操作,它不用于链接。
这是一个完整的示例(电子邮件除外,被日志记录取代):
package fr.amanin.Whosebug;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
public class WebfluxURLProcessing {
private static final Logger LOGGER = Logger.getLogger("example");
public static void main(String[] args) {
final List<String> urls = Arrays.asList("https://www.google.com", "https://kotlinlang.org/kotlin/is/wonderful/", "https://whosebug.com", "http://doNotexists.blabla");
final Flux<ExchangeDetails> events = Flux.fromIterable(urls)
// unwrap request async operations
.flatMap(url -> request(url))
// Add a side-effect to log results
.doOnNext(details -> log(details))
// Keep only results that show an error
.filter(details -> details.status < 0 || !HttpStatus.valueOf(details.status).is2xxSuccessful());
sendEmail(events);
}
/**
* Mock emails by collecting all events in a text and logging it.
* @param report asynchronous flow of responses
*/
private static void sendEmail(Flux<ExchangeDetails> report) {
final String formattedReport = report
.map(details -> String.format("Error on %s. status: %d. Reason: %s", details.url, details.status, details.error.getMessage()))
// collecting (or reducing, folding, etc.) allows to gather all upstream results to use them as a single value downstream.
.collect(Collectors.joining(System.lineSeparator(), "REPORT:"+System.lineSeparator(), ""))
// In a real-world scenario, replace this with a subscribe or chaining to another reactive operation.
.block();
LOGGER.info(formattedReport);
}
private static void log(ExchangeDetails details) {
if (details.status >= 0 && HttpStatus.valueOf(details.status).is2xxSuccessful()) {
LOGGER.info("Success on: "+details.url);
} else {
LOGGER.log(Level.WARNING,
"Status {0} on {1}. Reason: {2}",
new Object[]{
details.status,
details.url,
details.error == null ? "None" : details.error.getMessage()
});
}
}
private static Mono<ExchangeDetails> request(String url) {
return WebClient.create(url).get()
.retrieve()
// workaround to counter fail-fast behavior: create a special error that will be converted back to a result
.onStatus(status -> !status.is2xxSuccessful(), cr -> cr.createException().map(err -> new RequestException(cr.statusCode(), err)))
.toBodilessEntity()
.map(response -> new ExchangeDetails(url, response.getStatusCode().value(), null))
// Convert back custom error to result
.onErrorResume(RequestException.class, err -> Mono.just(new ExchangeDetails(url, err.status.value(), err.cause)))
// Convert errors that shut connection before server response (cannot connect, etc.) to a result
.onErrorResume(Exception.class, err -> Mono.just(new ExchangeDetails(url, -1, err)));
}
public static class ExchangeDetails {
final String url;
final int status;
final Exception error;
public ExchangeDetails(String url, int status, Exception error) {
this.url = url;
this.status = status;
this.error = error;
}
}
private static class RequestException extends RuntimeException {
final HttpStatus status;
final Exception cause;
public RequestException(HttpStatus status, Exception cause) {
this.status = status;
this.cause = cause;
}
}
}