阻止 Spring WebFlux WebClient 在新订阅时执行新交换
Prevent Spring WebFlux WebClient from performing new exchange upon new subscription
DefaultWebClient
将 exchange
实现为:
@Override
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}
正如您在上面看到的,exchangeFunction.exchange
调用被 Mono.defer
包装,因此只要订阅返回的 Mono<ClientResponse>
.
就会执行它
但是,在我非常具体的用例中,鉴于以下简化代码,我不想重新执行交换:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
正如您在上面的用例中看到的,我尝试使用 AtomicReference
延迟获取 Mono<ClientResponse>
,这样就不会一次又一次地发出 HTTP 请求。
这没有按预期工作,因为订阅 exchange()
发布的 Mono<ClientResponse>
的 do-something-with-num-and-response flatMap
将触发其内部 exchangeFunction.exchange
一次又一次。
我可以用一些东西包裹已发布的 Mono<ClientResponse>
来抵消 Mono.defer
的影响吗?或者有没有办法在不改变我的用例代码结构的情况下解决这个问题?
========== 可行的解决方案 ==========
受已接受答案的启发,我更改了代码如下:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange().cache()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
注意exchange()
后面的cache()
。 Mono
的缓存将其变成热源并缓存最后发出的信号以供更多订阅者使用。 Completion 和 Error 也会重播。
你可以这样做:
final WebClient webClient = WebClient.create("http://localhost:8080");
Flux<String> data = webClient
.get()
.uri("test")
.exchange()
//do whatever you need on response
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
.flux()
//Turn this Flux into a hot source and cache last emitted signals for further Subscriber
.replay()
//Connects this ConnectableFlux to the upstream source when the first Subscriber subscribes.
.autoConnect();
Flux.range(0, 10).flatMap(integer -> data).log().subscribe();
你可以这样做:
Mono<String> data = webClient
.get()
.uri("test")
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
.cache();
Flux.range(0, 10).flatMap(integer -> {
if (integer % 2 == 0)
return data;
else
return Mono.empty();
}).log().subscribe();
DefaultWebClient
将 exchange
实现为:
@Override
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
}
正如您在上面看到的,exchangeFunction.exchange
调用被 Mono.defer
包装,因此只要订阅返回的 Mono<ClientResponse>
.
但是,在我非常具体的用例中,鉴于以下简化代码,我不想重新执行交换:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
正如您在上面的用例中看到的,我尝试使用 AtomicReference
延迟获取 Mono<ClientResponse>
,这样就不会一次又一次地发出 HTTP 请求。
这没有按预期工作,因为订阅 exchange()
发布的 Mono<ClientResponse>
的 do-something-with-num-and-response flatMap
将触发其内部 exchangeFunction.exchange
一次又一次。
我可以用一些东西包裹已发布的 Mono<ClientResponse>
来抵消 Mono.defer
的影响吗?或者有没有办法在不改变我的用例代码结构的情况下解决这个问题?
========== 可行的解决方案 ==========
受已接受答案的启发,我更改了代码如下:
final WebClient webClient = WebClient.create("http://some-base-url");
final AtomicReference<Mono<ClientResponse>> responseRef = new AtomicReference<>(null);
Flux.fromIterable(Arrays.asList(1, 2, 3))
.flatMap(num -> {
if (...some condition...) {
return responseRef.updateAndGet(response -> response == null
? webClient.get().uri("/some-path").exchange().cache()
: response)
.flatMap(response -> {...do something with num and response...});
} else {
return Mono.just(...something...);
}
})
...
注意exchange()
后面的cache()
。 Mono
的缓存将其变成热源并缓存最后发出的信号以供更多订阅者使用。 Completion 和 Error 也会重播。
你可以这样做:
final WebClient webClient = WebClient.create("http://localhost:8080");
Flux<String> data = webClient
.get()
.uri("test")
.exchange()
//do whatever you need on response
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
.flux()
//Turn this Flux into a hot source and cache last emitted signals for further Subscriber
.replay()
//Connects this ConnectableFlux to the upstream source when the first Subscriber subscribes.
.autoConnect();
Flux.range(0, 10).flatMap(integer -> data).log().subscribe();
你可以这样做:
Mono<String> data = webClient
.get()
.uri("test")
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
.cache();
Flux.range(0, 10).flatMap(integer -> {
if (integer % 2 == 0)
return data;
else
return Mono.empty();
}).log().subscribe();