Spring Boot:如何使用 WebClient 而不是 RestTemplate 来执行非阻塞和异步调用
Springboot : How to use WebClient instead of RestTemplate for Performing Non blocking and Asynchronous calls
我有一个使用 Springboot Resttemplate 的 springboot 项目。我们已经从 1.5.3 迁移到 springboot 2.0.1,我们正在努力
其余调用使用 WebClient 异步调用。我们曾经使用 Resttemplate 处理接收到的字符串,如下所示。但仅 WebClient returns
Mono 或 Flux 中的数据。如何获取字符串形式的数据。已经尝试过 block() 方法,但它执行异步调用。
@Retryable(maxAttempts = 4, value = java.net.ConnectException.class,
backoff = @Backoff(delay = 3000, multiplier = 2))
public Mono<String> getResponse(String url) {
return webClient.get().uri(urlForCurrent).accept(APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
}
使用 RestTemplate 呈现数据流
- 控制器收到客户端调用
- 提供者获取字符串格式的数据
- 提供商处理字符串
- 数据交给控制器
Controller.java
@RequestMapping(value = traffic/, method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public String getTraffic(@RequestParam("place") String location) throws InterruptedException, ExecutionException {
String trafficJSON = Provider.getTrafficJSON(location)
return trafficJSON;
}
Provider.java
public String getTrafficJSON(String location) {
String url = ----;
ResponseEntity<String> response = dataFetcher.getResponse(url);
/// RESPONSEBODY IS READ AS STRING AND IT NEEDS TO BE PROCESSED
if (null != response {
return parser.transformJSON(response.getBody(), params);
}
return null;
}
DataFetcher.java
@Retryable(maxAttempts = 4,
value = java.net.ConnectException.class,
backoff = @Backoff(delay = 3000, multiplier = 2))
public ResponseEntity<String> getResponse(String url) {
/* ----------------------- */
return getRestTemplate().getForEntity(urlForCurrent, String.class);
}
第一步是使用 baseUrl 构建 WebClient
对象;
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080/api") //baseUrl
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
然后选择方法并附加路径以及请求变量或正文负载。
ResponseSpec responseSpec = webClient
.get()
.uri(uriBuilder -> uriBuilder.path("/findById") //additional path
.queryParam("id", id).build())
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new CustomRuntimeException("Error")));
用bodyToMono
的block()
函数等待响应。如果你想将响应作为字符串,你可以使用 google 的 gson 库将其转换。
Object response = responseSpec.bodyToMono(Object.class).block();
Gson gson = new Gson();
String str = gson.toJson(response);
如果您不想知道 api 呼叫的状态,您可以像下面那样做。
webClient
.post()
.uri(uri -> uri.path("/save").build())
.body( BodyInserters.fromObject(payload) )
.exchange().subscribe();
首先要了解的是,如果您需要调用 .block()
,您不妨坚持使用 RestTemplate
,使用 WebClient 将一无所获。
如果您想从使用 WebClient 中获益,则需要开始从反应的角度思考。反应过程实际上只是一系列步骤,每个步骤的输入都是前一步的输出。当收到请求时,您的代码会创建步骤序列,returns 会立即释放 http 线程。当上一步的输入可用时,框架然后使用工作线程池执行每个步骤。
这样做的好处是在接受竞争请求方面的能力得到了巨大的提升,而付出的代价却很小,那就是不得不重新考虑您编写代码的方式。您的应用程序只需要一个非常小的 http 线程池和另一个非常小的工作线程池。
当您的控制器方法 returning a Mono
或 Flux
时,您做对了,不需要调用 block()
。
最简单的形式如下:
@GetMapping(value = "endpoint", produces = MediaType.TEXT_PLAIN_VALUE)
@ResponseBody
@ResponseStatus(OK)
public Mono<String> controllerMethod() {
final UriComponentsBuilder builder =
UriComponentsBuilder.fromHttpUrl("http://base.url/" + "endpoint")
.queryParam("param1", "value");
return webClient
.get()
.uri(builder.build().encode().toUri())
.accept(APPLICATION_JSON_UTF8)
.retrieve()
.bodyToMono(String.class)
.retry(4)
.doOnError(e -> LOG.error("Boom!", e))
.map(s -> {
// This is your transformation step.
// Map is synchronous so will run in the thread that processed the response.
// Alternatively use flatMap (asynchronous) if the step will be long running.
// For example, if it needs to make a call out to the database to do the transformation.
return s.toLowerCase();
});
}
转向反应式思考是一个相当大的范式转变,但值得付出努力。坚持住,一旦您能够在整个应用程序中完全没有阻塞代码,这真的不是那么困难。构建步骤并 return 它们。然后让框架管理步骤的执行。
如果有任何不清楚的地方,我们很乐意提供更多指导。
记得玩得开心:)
由于存在很多误解,所以在这里我要澄清一些事情。
Spring 已经正式声明 RestTemplate
在 maintenence mode
中,所以如果可以的话,如果你想尽可能地成为未来的证明,请使用 WebClient
。
中所述
NOTE: As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the org.springframework.web.reactive.client.WebClient
which has a more modern API and supports sync, async, and streaming scenarios.
非反应性应用程序
如果您的应用程序是非反应性应用程序(不是 return 向调用客户端发送通量或单声道),如果您需要该值,则必须使用 block()
。您当然可以在您的应用程序内部使用 Mono
或 Flux
但最后您必须调用 block()
以获得您需要 return 给调用客户端的具体值.
非反应性应用程序使用 tomcat
、undertow
作为底层服务器实现,它遵循 servlet 规范,因此它将为每个请求分配 1 个线程,因此您不会获得性能提升使用响应式应用程序。
响应式应用程序
另一方面,如果您有一个响应式应用程序,则在任何情况下都不应在您的应用程序中调用 block()
。阻塞正是它所说的,它会阻塞一个线程并阻止该线程执行直到它可以继续,这在反应世界中是不好的。
您也不应该在您的应用程序中调用 subscribe
,除非您的应用程序是响应的最终消费者。例如,如果您正在调用 api 来获取数据并写入您的应用程序所连接的数据库。您的后端应用程序是最终消费者。如果外部客户端正在调用您的后端(例如反应、angular 应用程序、移动客户端等),则外部客户端是最终消费者,并且是订阅者。不是你。
这里的底层默认服务器实现是一个 netty
服务器,它是一个非 servlet、基于事件的服务器,它将 不会 为每个请求分配一个线程,即服务器本身与线程无关,任何可用的线程都将在任何请求期间随时处理任何事情。
webflux documentation 明确指出 servlet 3.1+ 支持的服务器 tomcat 和 jetty 都可以与 webflux 以及非 serlet 服务器 netty 和 undertow 一起使用。
我怎么知道我有什么应用程序?
Spring 声明如果在 class 路径上同时具有 spring-web
和 spring-webflux
,应用程序将支持 spring-web
并且默认启动具有底层 tomcat 服务器的非反应性应用程序。
如果需要,可以手动覆盖此行为,如 spring 所述。
Adding both spring-boot-starter-web
and spring-boot-starter-webflux
modules in your application results in Spring Boot auto-configuring Spring MVC, not WebFlux. This behavior has been chosen because many Spring developers add spring-boot-starter-webflux
to their Spring MVC application to use the reactive WebClient. You can still enforce your choice by setting the chosen application type to SpringApplication.setWebApplicationType(WebApplicationType.REACTIVE)
.
The “Spring WebFlux Framework”
那么如何按照问题提供的代码实现WebClient呢?
@Retryable(maxAttempts = 4,
value = java.net.ConnectException.class,
backoff = @Backoff(delay = 3000, multiplier = 2))
public Mono<String> getResponse(String url) {
return webClient.get()
.uri(url)
.exchange()
.flatMap(response -> response.toEntity(String.class));
}
我会说这是最简单、侵入性最低的实现。您当然需要在 @Bean
中构建一个合适的网络客户端并将其自动连接到 class.
据我了解,您的 Spring 启动网络应用程序中需要 Non-blocking/asynchronous 和网络 servlet/sync api 调用。我有完全相同的问题。答案很简单。您需要同时拥有 spring-boot-starter-web 和 spring-boot-starter-webflux 依赖项。
然后,当您想要进行 async/non-blocking 调用时,请确保按如下方式调用 toFuture() 方法。
configWebClient.post().uri("/alarm/check")
.body(BodyInserters.fromFormData(formData))
.exchangeToMono(res -> {
if (res.statusCode().equals(HttpStatus.OK)) {
return res.bodyToMono(Boolean.class);
} else {
return res.createException().flatMap(Mono::error);
}
}).toFuture();
我有一个使用 Springboot Resttemplate 的 springboot 项目。我们已经从 1.5.3 迁移到 springboot 2.0.1,我们正在努力 其余调用使用 WebClient 异步调用。我们曾经使用 Resttemplate 处理接收到的字符串,如下所示。但仅 WebClient returns Mono 或 Flux 中的数据。如何获取字符串形式的数据。已经尝试过 block() 方法,但它执行异步调用。
@Retryable(maxAttempts = 4, value = java.net.ConnectException.class,
backoff = @Backoff(delay = 3000, multiplier = 2))
public Mono<String> getResponse(String url) {
return webClient.get().uri(urlForCurrent).accept(APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
}
使用 RestTemplate 呈现数据流
- 控制器收到客户端调用
- 提供者获取字符串格式的数据
- 提供商处理字符串
- 数据交给控制器
Controller.java
@RequestMapping(value = traffic/, method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public String getTraffic(@RequestParam("place") String location) throws InterruptedException, ExecutionException {
String trafficJSON = Provider.getTrafficJSON(location)
return trafficJSON;
}
Provider.java
public String getTrafficJSON(String location) {
String url = ----;
ResponseEntity<String> response = dataFetcher.getResponse(url);
/// RESPONSEBODY IS READ AS STRING AND IT NEEDS TO BE PROCESSED
if (null != response {
return parser.transformJSON(response.getBody(), params);
}
return null;
}
DataFetcher.java
@Retryable(maxAttempts = 4,
value = java.net.ConnectException.class,
backoff = @Backoff(delay = 3000, multiplier = 2))
public ResponseEntity<String> getResponse(String url) {
/* ----------------------- */
return getRestTemplate().getForEntity(urlForCurrent, String.class);
}
第一步是使用 baseUrl 构建 WebClient
对象;
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080/api") //baseUrl
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
然后选择方法并附加路径以及请求变量或正文负载。
ResponseSpec responseSpec = webClient
.get()
.uri(uriBuilder -> uriBuilder.path("/findById") //additional path
.queryParam("id", id).build())
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new CustomRuntimeException("Error")));
用bodyToMono
的block()
函数等待响应。如果你想将响应作为字符串,你可以使用 google 的 gson 库将其转换。
Object response = responseSpec.bodyToMono(Object.class).block();
Gson gson = new Gson();
String str = gson.toJson(response);
如果您不想知道 api 呼叫的状态,您可以像下面那样做。
webClient
.post()
.uri(uri -> uri.path("/save").build())
.body( BodyInserters.fromObject(payload) )
.exchange().subscribe();
首先要了解的是,如果您需要调用 .block()
,您不妨坚持使用 RestTemplate
,使用 WebClient 将一无所获。
如果您想从使用 WebClient 中获益,则需要开始从反应的角度思考。反应过程实际上只是一系列步骤,每个步骤的输入都是前一步的输出。当收到请求时,您的代码会创建步骤序列,returns 会立即释放 http 线程。当上一步的输入可用时,框架然后使用工作线程池执行每个步骤。
这样做的好处是在接受竞争请求方面的能力得到了巨大的提升,而付出的代价却很小,那就是不得不重新考虑您编写代码的方式。您的应用程序只需要一个非常小的 http 线程池和另一个非常小的工作线程池。
当您的控制器方法 returning a Mono
或 Flux
时,您做对了,不需要调用 block()
。
最简单的形式如下:
@GetMapping(value = "endpoint", produces = MediaType.TEXT_PLAIN_VALUE)
@ResponseBody
@ResponseStatus(OK)
public Mono<String> controllerMethod() {
final UriComponentsBuilder builder =
UriComponentsBuilder.fromHttpUrl("http://base.url/" + "endpoint")
.queryParam("param1", "value");
return webClient
.get()
.uri(builder.build().encode().toUri())
.accept(APPLICATION_JSON_UTF8)
.retrieve()
.bodyToMono(String.class)
.retry(4)
.doOnError(e -> LOG.error("Boom!", e))
.map(s -> {
// This is your transformation step.
// Map is synchronous so will run in the thread that processed the response.
// Alternatively use flatMap (asynchronous) if the step will be long running.
// For example, if it needs to make a call out to the database to do the transformation.
return s.toLowerCase();
});
}
转向反应式思考是一个相当大的范式转变,但值得付出努力。坚持住,一旦您能够在整个应用程序中完全没有阻塞代码,这真的不是那么困难。构建步骤并 return 它们。然后让框架管理步骤的执行。
如果有任何不清楚的地方,我们很乐意提供更多指导。
记得玩得开心:)
由于存在很多误解,所以在这里我要澄清一些事情。
Spring 已经正式声明 RestTemplate
在 maintenence mode
中,所以如果可以的话,如果你想尽可能地成为未来的证明,请使用 WebClient
。
NOTE: As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the
org.springframework.web.reactive.client.WebClient
which has a more modern API and supports sync, async, and streaming scenarios.
非反应性应用程序
如果您的应用程序是非反应性应用程序(不是 return 向调用客户端发送通量或单声道),如果您需要该值,则必须使用 block()
。您当然可以在您的应用程序内部使用 Mono
或 Flux
但最后您必须调用 block()
以获得您需要 return 给调用客户端的具体值.
非反应性应用程序使用 tomcat
、undertow
作为底层服务器实现,它遵循 servlet 规范,因此它将为每个请求分配 1 个线程,因此您不会获得性能提升使用响应式应用程序。
响应式应用程序
另一方面,如果您有一个响应式应用程序,则在任何情况下都不应在您的应用程序中调用 block()
。阻塞正是它所说的,它会阻塞一个线程并阻止该线程执行直到它可以继续,这在反应世界中是不好的。
您也不应该在您的应用程序中调用 subscribe
,除非您的应用程序是响应的最终消费者。例如,如果您正在调用 api 来获取数据并写入您的应用程序所连接的数据库。您的后端应用程序是最终消费者。如果外部客户端正在调用您的后端(例如反应、angular 应用程序、移动客户端等),则外部客户端是最终消费者,并且是订阅者。不是你。
这里的底层默认服务器实现是一个 netty
服务器,它是一个非 servlet、基于事件的服务器,它将 不会 为每个请求分配一个线程,即服务器本身与线程无关,任何可用的线程都将在任何请求期间随时处理任何事情。
webflux documentation 明确指出 servlet 3.1+ 支持的服务器 tomcat 和 jetty 都可以与 webflux 以及非 serlet 服务器 netty 和 undertow 一起使用。
我怎么知道我有什么应用程序?
Spring 声明如果在 class 路径上同时具有 spring-web
和 spring-webflux
,应用程序将支持 spring-web
并且默认启动具有底层 tomcat 服务器的非反应性应用程序。
如果需要,可以手动覆盖此行为,如 spring 所述。
Adding both
spring-boot-starter-web
andspring-boot-starter-webflux
modules in your application results in Spring Boot auto-configuring Spring MVC, not WebFlux. This behavior has been chosen because many Spring developers addspring-boot-starter-webflux
to their Spring MVC application to use the reactive WebClient. You can still enforce your choice by setting the chosen application type toSpringApplication.setWebApplicationType(WebApplicationType.REACTIVE)
.
The “Spring WebFlux Framework”
那么如何按照问题提供的代码实现WebClient呢?
@Retryable(maxAttempts = 4,
value = java.net.ConnectException.class,
backoff = @Backoff(delay = 3000, multiplier = 2))
public Mono<String> getResponse(String url) {
return webClient.get()
.uri(url)
.exchange()
.flatMap(response -> response.toEntity(String.class));
}
我会说这是最简单、侵入性最低的实现。您当然需要在 @Bean
中构建一个合适的网络客户端并将其自动连接到 class.
据我了解,您的 Spring 启动网络应用程序中需要 Non-blocking/asynchronous 和网络 servlet/sync api 调用。我有完全相同的问题。答案很简单。您需要同时拥有 spring-boot-starter-web 和 spring-boot-starter-webflux 依赖项。 然后,当您想要进行 async/non-blocking 调用时,请确保按如下方式调用 toFuture() 方法。
configWebClient.post().uri("/alarm/check")
.body(BodyInserters.fromFormData(formData))
.exchangeToMono(res -> {
if (res.statusCode().equals(HttpStatus.OK)) {
return res.bodyToMono(Boolean.class);
} else {
return res.createException().flatMap(Mono::error);
}
}).toFuture();