在 Spring WebFlux Web 应用程序中缓存来自 WebClient 调用的 Mono 结果
Cache the result of a Mono from a WebClient call in a Spring WebFlux web application
我希望缓存 WebClient 调用的结果 Mono
(仅当它成功时)。
通过阅读项目反应器插件文档,我觉得 CacheMono 不是一个很好的选择,因为它也缓存了我不想要的错误。
因此,我没有使用 CacheMono
,而是执行以下操作:
Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache =
Caffeine.newBuilder()
.maximumSize(100)
.expireAfterWrite(Duration.ofSeconds(60))
.build();
MyRequestObject myRequestObject = ...;
Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
requestAsKey -> WebClient.create()
.post()
.uri("http://www.example.com")
.syncBody(requestAsKey)
.retrieve()
.bodyToMono(MyResponseObject.class)
.cache()
.doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));
这里我在 Mono
上调用缓存,然后将其添加到咖啡因缓存中。
任何错误都会进入doOnError
使缓存失效。
这是缓存 Mono
WebClient 响应的有效方法吗?
这是极少数实际允许您调用非响应式库并用响应式类型包装它们,并在像 doOnXYZ
这样的副作用运算符中完成处理的用例之一,因为:
- Caffeine 是一种内存缓存,据我所知没有 I/O 涉及
- 缓存通常不提供关于缓存值的强有力保证(这是非常“即发即忘”)
在这种情况下,您可以查询缓存以查看是否存在缓存版本(将其包装并 return 立即),并在 doOn
运算符中缓存成功的真实响应,像这样:
public class MyService {
private WebClient client;
private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;
public MyService() {
this.client = WebClient.create();
this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
.expireAfterWrite(Duration.ofSeconds(60)).build();
}
public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {
MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
if (cachedVersion != null) {
return Mono.just(cachedVersion);
} else {
return this.client.post()
.uri("http://www.example.com")
.syncBody(request.getKey())
.retrieve()
.bodyToMono(MyResponseObject.class)
.doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
}
}
请注意,我不会在这里缓存反应类型,因为一旦值被缓存return,就不会涉及 I/O 或背压。相反,订阅和其他反应流约束使事情变得更加困难。
此外,您对 cache
运算符的看法是正确的,因为它不是关于缓存值本身,而是关于重播发生在其他订阅者身上的事情。我相信 cache
和 replay
运算符实际上是 Flux
.
的同义词
实际上,您不必使用 CacheMono 来保存错误。
private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;
...
Mono<MyResponseObject> myResponseObject =
CacheMono.lookup(key -> Mono.justOrEmpty(myCaffeineCache.getIfPresent(key))
.map(Signal::next), myRequestObject)
.onCacheMissResume(() -> /* Your web client or other Mono here */)
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> myCaffeineCache.put(key, value))));
当您切换到外部缓存时,这可能会有用。不要忘记为外部缓存使用反应式客户端。
我希望缓存 WebClient 调用的结果 Mono
(仅当它成功时)。
通过阅读项目反应器插件文档,我觉得 CacheMono 不是一个很好的选择,因为它也缓存了我不想要的错误。
因此,我没有使用 CacheMono
,而是执行以下操作:
Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache =
Caffeine.newBuilder()
.maximumSize(100)
.expireAfterWrite(Duration.ofSeconds(60))
.build();
MyRequestObject myRequestObject = ...;
Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
requestAsKey -> WebClient.create()
.post()
.uri("http://www.example.com")
.syncBody(requestAsKey)
.retrieve()
.bodyToMono(MyResponseObject.class)
.cache()
.doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));
这里我在 Mono
上调用缓存,然后将其添加到咖啡因缓存中。
任何错误都会进入doOnError
使缓存失效。
这是缓存 Mono
WebClient 响应的有效方法吗?
这是极少数实际允许您调用非响应式库并用响应式类型包装它们,并在像 doOnXYZ
这样的副作用运算符中完成处理的用例之一,因为:
- Caffeine 是一种内存缓存,据我所知没有 I/O 涉及
- 缓存通常不提供关于缓存值的强有力保证(这是非常“即发即忘”)
在这种情况下,您可以查询缓存以查看是否存在缓存版本(将其包装并 return 立即),并在 doOn
运算符中缓存成功的真实响应,像这样:
public class MyService {
private WebClient client;
private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;
public MyService() {
this.client = WebClient.create();
this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
.expireAfterWrite(Duration.ofSeconds(60)).build();
}
public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {
MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
if (cachedVersion != null) {
return Mono.just(cachedVersion);
} else {
return this.client.post()
.uri("http://www.example.com")
.syncBody(request.getKey())
.retrieve()
.bodyToMono(MyResponseObject.class)
.doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
}
}
请注意,我不会在这里缓存反应类型,因为一旦值被缓存return,就不会涉及 I/O 或背压。相反,订阅和其他反应流约束使事情变得更加困难。
此外,您对 cache
运算符的看法是正确的,因为它不是关于缓存值本身,而是关于重播发生在其他订阅者身上的事情。我相信 cache
和 replay
运算符实际上是 Flux
.
实际上,您不必使用 CacheMono 来保存错误。
private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;
...
Mono<MyResponseObject> myResponseObject =
CacheMono.lookup(key -> Mono.justOrEmpty(myCaffeineCache.getIfPresent(key))
.map(Signal::next), myRequestObject)
.onCacheMissResume(() -> /* Your web client or other Mono here */)
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> myCaffeineCache.put(key, value))));
当您切换到外部缓存时,这可能会有用。不要忘记为外部缓存使用反应式客户端。