Spring WebClient - 如果在 doOnError 中抛出异常则停止重试

Spring WebClient - Stop retrying if an exception is thrown in the doOnError

我有以下代码来发出将重试最多次数的请求。此请求需要授权header,我正在缓存此信息以防止此方法每次都调用方法来检索此信息。

我想做的是:

  1. 调用 myMethod 时,我首先检索所调用服务的登录信息,在大多数情况下,当调用 getAuthorizationHeaderValue 方法时,这些信息将来自缓存。
  2. 在 Web 客户端中,如果发送此请求的响应 returns 是一个 4xx 响应,我需要在重试请求之前再次登录到我正在调用的服务。为此,我调用 tryToLoginAgain 方法再次设置 header 的值。
  3. 完成后,请求的重试应该可以正常工作,因为 header 已经设置。
  4. 如果再次调用登录失败,我需要停止重试,因为重试请求没有用。
public <T> T myMethod(...) {
    ...

    try {
        AtomicReference<String> headerValue = new AtomicReference<>(loginService.getAuthorizationHeaderValue());

        Mono<T> monoResult = webclient.get()
                .uri(uri)
                .accept(MediaType.APPLICATION_JSON)
                .header(HttpHeaders.AUTHORIZATION, headerValue.get())
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, response -> throwHttpClientLoginException())
                .bodyToMono(type)
                .doOnError(HttpClientLoginException.class, e -> tryToLoginAgain(headerValue))
                .retryWhen(Retry.backoff(MAX_NUMBER_RETRIES, Duration.ofSeconds(5)));

        result = monoResult.block();
    } catch(Exception e) {
        throw new HttpClientException("There was an error while sending the request");
    }
    return result;
}

...

private Mono<Throwable> throwHttpClientLoginException() {
    return Mono.error(new HttpClientLoginException("Existing Authorization failed"));
}

private void tryToLoginAgain(AtomicReference<String> headerValue) {
    loginService.removeAccessTokenFromCache();
    
    headerValue.set(loginService.getAuthorizationHeaderValue());
}

我有一些单元测试,happy path 工作正常(第一次未授权,尝试再次登录并再次发送请求)但是登录根本不起作用的场景不起作用。

我认为如果 tryToLoginAgain 方法抛出一个异常,该异常将被我在 myMethod 中的捕获捕获,但它永远不会到达那里,它只是再次重试请求。有什么办法可以做我想做的事吗?

所以最后我找到了一种方法来做我想做的事情,现在代码如下所示:

public <T> T myMethod() {
    try {
        HttpHeaders headers = new HttpHeaders();
        headers.setBearerAuth(getAuthorizationHeaderValue());

        final RetryBackoffSpec retrySpec = Retry.backoff(MAX_NUMBER_RETRIES, Duration.ofSeconds(5))
            .doBeforeRetry(retrySignal -> {
                //When retrying, if this was a login error, try to login again
                if (retrySignal.failure() instanceof HttpClientLoginException) {
                    tryToLoginAgain(headers);
                }
            });

        Mono<T> monoResult = Mono.defer(() ->
                getRequestFromMethod(httpMethod, uri, body, headers)
                    .retrieve()
                    .onStatus(HttpStatus::is4xxClientError, response -> throwHttpClientLoginException())
                    .bodyToMono(type)
            )
            .retryWhen(retrySpec);

        result = monoResult.block();
    } catch (Exception e) {
        String requestUri = uri != null ?
            uri.toString() :
            endpoint;

        log.error("There was an error while sending the request [{}] [{}]", httpMethod.name(), requestUri);

        throw new HttpClientException("There was an error while sending the request [" + httpMethod.name() +
            "] [" + requestUri + "]");
    }

    return result;
}

private void tryToLoginAgain(HttpHeaders httpHeaders) {
    //If there was an 4xx error, let's evict the cache to remove the existing access_token (if it exists)
    loginService.removeAccessTokenFromCache();
    //And let's try to login again
    httpHeaders.setBearerAuth(getAuthorizationHeaderValue());
}

private Mono<Throwable> throwHttpClientLoginException() {
    return Mono.error(new HttpClientLoginException("Existing Authorization failed"));
}

private WebClient.RequestHeadersSpec getRequestFromMethod(HttpMethod httpMethod, URI uri, Object body, HttpHeaders headers) {
    switch (httpMethod) {
        case GET:
            return webClient.get()
                .uri(uri)
                .headers(httpHeaders -> httpHeaders.addAll(headers))
                .accept(MediaType.APPLICATION_JSON);
        case POST:
            return body == null ?
                webClient.post()
                    .uri(uri)
                    .headers(httpHeaders -> httpHeaders.addAll(headers))
                    .accept(MediaType.APPLICATION_JSON) :
                webClient.post()
                    .uri(uri)
                    .headers(httpHeaders -> httpHeaders.addAll(headers))
                    .accept(MediaType.APPLICATION_JSON)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(body);
        case PUT:
            return body == null ?
                webClient.put()
                    .uri(uri)
                    .headers(httpHeaders -> httpHeaders.addAll(headers))
                    .accept(MediaType.APPLICATION_JSON) :
                webClient.put()
                    .uri(uri)
                    .headers(httpHeaders -> httpHeaders.addAll(headers))
                    .accept(MediaType.APPLICATION_JSON)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(body);
        case DELETE:
            return webClient.delete()
                .uri(uri)
                .headers(httpHeaders -> httpHeaders.addAll(headers))
                .accept(MediaType.APPLICATION_JSON);
        default:
            log.error("Method [{}] is not supported", httpMethod.name());
            throw new HttpClientException("Method [" + httpMethod.name() + "] is not supported");
    }
}

private String getAuthorizationHeaderValue() {
    return loginService.retrieveAccessToken();
}

通过使用 Mono.defer(),我可以重试该 Mono 并确保我更改了将用于 WebClient 的 headers。重试规范将检查异常是否为 HttpClientLoginException 类型,当请求获得 4xx 状态代码时抛出,在这种情况下它将尝试再次登录并设置 header 以进行下一次重试。如果状态代码不同,它将使用相同的授权重试。

另外,如果我们再次尝试登录时出现错误,会被catch捕获,不会再重试。