Spring Boot:如何使用 WebClient 而不是 RestTemplate 来执行非阻塞和异步调用

Springboot : How to use WebClient instead of RestTemplate for Performing Non blocking and Asynchronous calls

我有一个使用 Springboot Resttemplate 的 springboot 项目。我们已经从 1.5.3 迁移到 springboot 2.0.1,我们正在努力 其余调用使用 WebClient 异步调用。我们曾经使用 Resttemplate 处理接收到的字符串,如下所示。但仅 WebClient returns Mono 或 Flux 中的数据。如何获取字符串形式的数据。已经尝试过 block() 方法,但它执行异步调用。

@Retryable(maxAttempts = 4, value = java.net.ConnectException.class,
           backoff = @Backoff(delay = 3000, multiplier = 2))
public Mono<String> getResponse(String url) {
    return webClient.get().uri(urlForCurrent).accept(APPLICATION_JSON)
                    .retrieve()
                    .bodyToMono(String.class);
}

使用 RestTemplate 呈现数据流

  1. 控制器收到客户端调用
  2. 提供者获取字符串格式的数据
  3. 提供商处理字符串
  4. 数据交给控制器

Controller.java

@RequestMapping(value = traffic/, method = RequestMethod.GET,
                produces = MediaType.APPLICATION_JSON_VALUE)
public String getTraffic(@RequestParam("place") String location) throws InterruptedException, ExecutionException {
    String trafficJSON = Provider.getTrafficJSON(location)
    return trafficJSON;
}

Provider.java

public String getTrafficJSON(String location) {
    String url = ----;

    ResponseEntity<String> response = dataFetcher.getResponse(url);

    /// RESPONSEBODY IS READ AS STRING AND IT NEEDS TO BE PROCESSED
    if (null != response {
        return parser.transformJSON(response.getBody(), params);
    }

    return null;
}

DataFetcher.java

@Retryable(maxAttempts = 4,
           value = java.net.ConnectException.class,
           backoff = @Backoff(delay = 3000, multiplier = 2))
public ResponseEntity<String> getResponse(String url) {
    /* ----------------------- */
    return getRestTemplate().getForEntity(urlForCurrent, String.class);
}

第一步是使用 baseUrl 构建 WebClient 对象;

WebClient webClient = WebClient.builder()
    .baseUrl("http://localhost:8080/api") //baseUrl
    .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
    .build();

然后选择方法并附加路径以及请求变量或正文负载。

ResponseSpec responseSpec = webClient
    .get()
    .uri(uriBuilder -> uriBuilder.path("/findById") //additional path
        .queryParam("id", id).build())
    .retrieve()
    .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new CustomRuntimeException("Error")));

bodyToMonoblock()函数等待响应。如果你想将响应作为字符串,你可以使用 google 的 gson 库将其转换。

Object response = responseSpec.bodyToMono(Object.class).block();
Gson gson = new Gson();
String str = gson.toJson(response);

如果您不想知道 api 呼叫的状态,您可以像下面那样做。

webClient
    .post()
    .uri(uri -> uri.path("/save").build())
    .body( BodyInserters.fromObject(payload) )
    .exchange().subscribe();

首先要了解的是,如果您需要调用 .block(),您不妨坚持使用 RestTemplate,使用 WebClient 将一无所获。

如果您想从使用 WebClient 中获益,则需要开始从反应的角度思考。反应过程实际上只是一系列步骤,每个步骤的输入都是前一步的输出。当收到请求时,您的代码会创建步骤序列,returns 会立即释放 http 线程。当上一步的输入可用时,框架然后使用工作线程池执行每个步骤。

这样做的好处是在接受竞争请求方面的能力得到了巨大的提升,而付出的代价却很小,那就是不得不重新考虑您编写代码的方式。您的应用程序只需要一个非常小的 http 线程池和另一个非常小的工作线程池。

当您的控制器方法 returning a MonoFlux 时,您做对了,不需要调用 block()

最简单的形式如下:

@GetMapping(value = "endpoint", produces = MediaType.TEXT_PLAIN_VALUE)
@ResponseBody
@ResponseStatus(OK)
public Mono<String> controllerMethod() {

    final UriComponentsBuilder builder =
            UriComponentsBuilder.fromHttpUrl("http://base.url/" + "endpoint")
                    .queryParam("param1", "value");

    return webClient
            .get()
            .uri(builder.build().encode().toUri())
            .accept(APPLICATION_JSON_UTF8)
            .retrieve()
            .bodyToMono(String.class)
            .retry(4)
            .doOnError(e -> LOG.error("Boom!", e))
            .map(s -> {

                // This is your transformation step. 
                // Map is synchronous so will run in the thread that processed the response. 
                // Alternatively use flatMap (asynchronous) if the step will be long running. 
                // For example, if it needs to make a call out to the database to do the transformation.

                return s.toLowerCase();
            });
}

转向反应式思考是一个相当大的范式转变,但值得付出努力。坚持住,一旦您能够在整个应用程序中完全没有阻塞代码,这真的不是那么困难。构建步骤并 return 它们。然后让框架管理步骤的执行。

如果有任何不清楚的地方,我们很乐意提供更多指导。

记得玩得开心:)

由于存在很多误解,所以在这里我要澄清一些事情。

Spring 已经正式声明 RestTemplatemaintenence mode 中,所以如果可以的话,如果你想尽可能地成为未来的证明,请使用 WebClient

RestTemplate API

中所述

NOTE: As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the org.springframework.web.reactive.client.WebClient which has a more modern API and supports sync, async, and streaming scenarios.

非反应性应用程序

如果您的应用程序是非反应性应用程序(不是 return 向调用客户端发送通量或单声道),如果您需要该值,则必须使用 block()。您当然可以在您的应用程序内部使用 MonoFlux 但最后您必须调用 block() 以获得您需要 return 给调用客户端的具体值.

非反应性应用程序使用 tomcatundertow 作为底层服务器实现,它遵循 servlet 规范,因此它将为每个请求分配 1 个线程,因此您不会获得性能提升使用响应式应用程序。

响应式应用程序

另一方面,如果您有一个响应式应用程序,则在任何情况下都不应在您的应用程序中调用 block()。阻塞正是​​它所说的,它会阻塞一个线程并阻止该线程执行直到它可以继续,这在反应世界中是不好的。

您也不应该在您的应用程序中调用 subscribe,除非您的应用程序是响应的最终消费者。例如,如果您正在调用 api 来获取数据并写入您的应用程序所连接的数据库。您的后端应用程序是最终消费者。如果外部客户端正在调用您的后端(例如反应、angular 应用程序、移动客户端等),则外部客户端是最终消费者,并且是订阅者。不是你。

这里的底层默认服务器实现是一个 netty 服务器,它是一个非 servlet、基于事件的服务器,它将 不会 为每个请求分配一个线程,即服务器本身与线程无关,任何可用的线程都将在任何请求期间随时处理任何事情。

webflux documentation 明确指出 servlet 3.1+ 支持的服务器 tomcat 和 jetty 都可以与 webflux 以及非 serlet 服务器 netty 和 undertow 一起使用。

我怎么知道我有什么应用程序?

Spring 声明如果在 class 路径上同时具有 spring-webspring-webflux,应用程序将支持 spring-web 并且默认启动具有底层 tomcat 服务器的非反应性应用程序。

如果需要,可以手动覆盖此行为,如 spring 所述。

Adding both spring-boot-starter-web and spring-boot-starter-webflux modules in your application results in Spring Boot auto-configuring Spring MVC, not WebFlux. This behavior has been chosen because many Spring developers add spring-boot-starter-webflux to their Spring MVC application to use the reactive WebClient. You can still enforce your choice by setting the chosen application type to SpringApplication.setWebApplicationType(WebApplicationType.REACTIVE).

The “Spring WebFlux Framework”

那么如何按照问题提供的代码实现WebClient呢?

@Retryable(maxAttempts = 4,
       value = java.net.ConnectException.class,
       backoff = @Backoff(delay = 3000, multiplier = 2))
public Mono<String> getResponse(String url) {
    return webClient.get()
            .uri(url)
            .exchange()
            .flatMap(response -> response.toEntity(String.class));
}

我会说这是最简单、侵入性最低的实现。您当然需要在 @Bean 中构建一个合适的网络客户端并将其自动连接到 class.

据我了解,您的 Spring 启动网络应用程序中需要 Non-blocking/asynchronous 和网络 servlet/sync api 调用。我有完全相同的问题。答案很简单。您需要同时拥有 spring-boot-starter-web 和 spring-boot-starter-webflux 依赖项。 然后,当您想要进行 async/non-blocking 调用时,请确保按如下方式调用 toFuture() 方法。

configWebClient.post().uri("/alarm/check")
            .body(BodyInserters.fromFormData(formData))
            .exchangeToMono(res -> {
                if (res.statusCode().equals(HttpStatus.OK)) {
                    return res.bodyToMono(Boolean.class);
                } else {
                    return res.createException().flatMap(Mono::error);
                }
            }).toFuture();