如何在重试时重新调用 WebClient 的 ExchangeFilterFunction
How to reinvoke WebClient's ExchangeFilterFunction on retry
当使用 reactor 的 retry(..)
运算符时 WebClient
重试时不会触发交换过滤器功能。我明白为什么,但问题是当函数(如下所示)生成具有到期时间的身份验证令牌时。这可能会发生,当请求正在 "retried" 时,令牌会过期,因为在重试期间不会重新调用 Exchange 函数。有没有办法为每次重试重新生成令牌?
以下 AuthClientExchangeFunction
会生成一个过期的身份验证令牌 (JWT)。
public class AuthClientExchangeFunction implements ExchangeFilterFunction {
private final TokenProvider tokenProvider;
public IntraAuthWebClientExchangeFunction(TokenProvider tokenProvider) {
this.tokenProvider = tokenProvider;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
String jwt = tokenProvider.getToken();
return next.exchange(withBearer(request, jwt));
}
private ClientRequest withBearer(ClientRequest request, String jwt){
return ClientRequest.from(request)
.headers(headers -> headers.set(HttpHeaders.AUTHORIZATION, "Bearer "+ jwt))
.build();
}
}
假设一个令牌的有效期为 2999 毫秒 -> 每个重试请求都因 401 而失败。
WebClient client = WebClient.builder()
.filter(new AuthClientExchangeFunction(tokenProvider))
.build();
client.get()
.uri("/api")
.retrieve()
.bodyToMono(String.class)
.retryBackoff(1, Duration.ofMillis(3000)) ;
编辑
这是一个 executable example
@SpringBootTest
@RunWith(SpringRunner.class)
public class RetryApplicationTests {
private static final MockWebServer server = new MockWebServer();
private final RquestCountingFilterFunction requestCounter = new RquestCountingFilterFunction();
@AfterClass
public static void shutdown() throws IOException {
server.shutdown();
}
@Test
public void test() {
server.enqueue(new MockResponse().setResponseCode(500).setBody("{}"));
server.enqueue(new MockResponse().setResponseCode(500).setBody("{}"));
server.enqueue(new MockResponse().setResponseCode(500).setBody("{}"));
server.enqueue(new MockResponse().setResponseCode(200).setBody("{}"));
WebClient webClient = WebClient.builder()
.baseUrl(server.url("/api").toString())
.filter(requestCounter)
.build();
Mono<String> responseMono1 = webClient.get()
.uri("/api")
.retrieve()
.bodyToMono(String.class)
.retryBackoff(3, Duration.ofMillis(1000)) ;
StepVerifier.create(responseMono1).expectNextCount(1).verifyComplete();
assertThat(requestCounter.count()).isEqualTo(4);
}
static class RquestCountingFilterFunction implements ExchangeFilterFunction {
final Logger log = LoggerFactory.getLogger(getClass());
final AtomicInteger counter = new AtomicInteger();
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
log.info("Sending {} request to {} {}", counter.incrementAndGet(), request.method(), request.url());
return next.exchange(request);
}
int count() {
return counter.get();
}
}
}
输出
MockWebServer[44855] starting to accept connections
Sending 1 request to GET http://localhost:44855/api/api
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 500 Server Error
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 500 Server Error
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 500 Server Error
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 200 OK
org.junit.ComparisonFailure:
Expected :4
Actual :1
您需要将 spring-引导版本更新为 2.2.0.RELEASE
。 retry()
之前版本不会调用exchange函数
我已经使用简单的代码(在 Kotlin 中)对此进行了测试。
@Component
class AnswerPub {
val webClient = WebClient.builder()
.filter(PrintExchangeFunction())
.baseUrl("https://jsonplaceholder.typicode.com").build()
fun productInfo(): Mono<User> {
return webClient
.get()
.uri("/todos2/1")
.retrieve()
.bodyToMono(User::class.java)
.retry(2) { it is Exception }
}
data class User(
val id: String,
val userId: String,
val title: String,
val completed: Boolean
)
}
class PrintExchangeFunction : ExchangeFilterFunction {
override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
println("Filtered")
return next.exchange(request)
}
}
控制台输出如下:
2019-10-29 09:31:55.912 INFO 12206 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2019-10-29 09:31:55.917 INFO 12206 --- [ main] c.e.s.SpringWfDemoApplicationKt : Started SpringWfDemoApplicationKt in 3.19 seconds (JVM running for 4.234)
Filtered
Filtered
Filtered
所以在我的例子中,每次都会调用交换函数。
当使用 reactor 的 retry(..)
运算符时 WebClient
重试时不会触发交换过滤器功能。我明白为什么,但问题是当函数(如下所示)生成具有到期时间的身份验证令牌时。这可能会发生,当请求正在 "retried" 时,令牌会过期,因为在重试期间不会重新调用 Exchange 函数。有没有办法为每次重试重新生成令牌?
以下 AuthClientExchangeFunction
会生成一个过期的身份验证令牌 (JWT)。
public class AuthClientExchangeFunction implements ExchangeFilterFunction {
private final TokenProvider tokenProvider;
public IntraAuthWebClientExchangeFunction(TokenProvider tokenProvider) {
this.tokenProvider = tokenProvider;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
String jwt = tokenProvider.getToken();
return next.exchange(withBearer(request, jwt));
}
private ClientRequest withBearer(ClientRequest request, String jwt){
return ClientRequest.from(request)
.headers(headers -> headers.set(HttpHeaders.AUTHORIZATION, "Bearer "+ jwt))
.build();
}
}
假设一个令牌的有效期为 2999 毫秒 -> 每个重试请求都因 401 而失败。
WebClient client = WebClient.builder()
.filter(new AuthClientExchangeFunction(tokenProvider))
.build();
client.get()
.uri("/api")
.retrieve()
.bodyToMono(String.class)
.retryBackoff(1, Duration.ofMillis(3000)) ;
编辑 这是一个 executable example
@SpringBootTest
@RunWith(SpringRunner.class)
public class RetryApplicationTests {
private static final MockWebServer server = new MockWebServer();
private final RquestCountingFilterFunction requestCounter = new RquestCountingFilterFunction();
@AfterClass
public static void shutdown() throws IOException {
server.shutdown();
}
@Test
public void test() {
server.enqueue(new MockResponse().setResponseCode(500).setBody("{}"));
server.enqueue(new MockResponse().setResponseCode(500).setBody("{}"));
server.enqueue(new MockResponse().setResponseCode(500).setBody("{}"));
server.enqueue(new MockResponse().setResponseCode(200).setBody("{}"));
WebClient webClient = WebClient.builder()
.baseUrl(server.url("/api").toString())
.filter(requestCounter)
.build();
Mono<String> responseMono1 = webClient.get()
.uri("/api")
.retrieve()
.bodyToMono(String.class)
.retryBackoff(3, Duration.ofMillis(1000)) ;
StepVerifier.create(responseMono1).expectNextCount(1).verifyComplete();
assertThat(requestCounter.count()).isEqualTo(4);
}
static class RquestCountingFilterFunction implements ExchangeFilterFunction {
final Logger log = LoggerFactory.getLogger(getClass());
final AtomicInteger counter = new AtomicInteger();
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
log.info("Sending {} request to {} {}", counter.incrementAndGet(), request.method(), request.url());
return next.exchange(request);
}
int count() {
return counter.get();
}
}
}
输出
MockWebServer[44855] starting to accept connections
Sending 1 request to GET http://localhost:44855/api/api
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 500 Server Error
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 500 Server Error
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 500 Server Error
MockWebServer[44855] received request: GET /api/api HTTP/1.1 and responded: HTTP/1.1 200 OK
org.junit.ComparisonFailure:
Expected :4
Actual :1
您需要将 spring-引导版本更新为 2.2.0.RELEASE
。 retry()
之前版本不会调用exchange函数
我已经使用简单的代码(在 Kotlin 中)对此进行了测试。
@Component
class AnswerPub {
val webClient = WebClient.builder()
.filter(PrintExchangeFunction())
.baseUrl("https://jsonplaceholder.typicode.com").build()
fun productInfo(): Mono<User> {
return webClient
.get()
.uri("/todos2/1")
.retrieve()
.bodyToMono(User::class.java)
.retry(2) { it is Exception }
}
data class User(
val id: String,
val userId: String,
val title: String,
val completed: Boolean
)
}
class PrintExchangeFunction : ExchangeFilterFunction {
override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
println("Filtered")
return next.exchange(request)
}
}
控制台输出如下:
2019-10-29 09:31:55.912 INFO 12206 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2019-10-29 09:31:55.917 INFO 12206 --- [ main] c.e.s.SpringWfDemoApplicationKt : Started SpringWfDemoApplicationKt in 3.19 seconds (JVM running for 4.234)
Filtered
Filtered
Filtered
所以在我的例子中,每次都会调用交换函数。