在 Spring WebFlux Web 应用程序中缓存来自 WebClient 调用的 Mono 结果

Cache the result of a Mono from a WebClient call in a Spring WebFlux web application

我希望缓存 WebClient 调用的结果 Mono(仅当它成功时)。

通过阅读项目反应器插件文档,我觉得 CacheMono 不是一个很好的选择,因为它也缓存了我不想要的错误。

因此,我没有使用 CacheMono,而是执行以下操作:

Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache = 
    Caffeine.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(Duration.ofSeconds(60))
            .build();

MyRequestObject myRequestObject = ...;

Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
    requestAsKey -> WebClient.create()
                             .post()
                             .uri("http://www.example.com")
                             .syncBody(requestAsKey)
                             .retrieve()
                             .bodyToMono(MyResponseObject.class)
                             .cache()
                             .doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));

这里我在 Mono 上调用缓存,然后将其添加到咖啡因缓存中。

任何错误都会进入doOnError使缓存失效。

这是缓存 Mono WebClient 响应的有效方法吗?

这是极少数实际允许您调用非响应式库并用响应式类型包装它们,并在像 doOnXYZ 这样的副作用运算符中完成处理的用例之一,因为:

  • Caffeine 是一种内存缓存,据我所知没有 I/O 涉及
  • 缓存通常不提供关于缓存值的强有力保证(这是非常“即发即忘”)

在这种情况下,您可以查询缓存以查看是否存在缓存版本(将其包装并 return 立即),并在 doOn 运算符中缓存成功的真实响应,像这样:

public class MyService {

    private WebClient client;

    private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

    public MyService() {
        this.client = WebClient.create();
        this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
          .expireAfterWrite(Duration.ofSeconds(60)).build();
    }

    public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {

        MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
        if (cachedVersion != null) {
           return Mono.just(cachedVersion);
        } else {
           return this.client.post()
                         .uri("http://www.example.com")
                         .syncBody(request.getKey())
                         .retrieve()
                         .bodyToMono(MyResponseObject.class)
                         .doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
    }
}

请注意,我不会在这里缓存反应类型,因为一旦值被缓存return,就不会涉及 I/O 或背压。相反,订阅和其他反应流约束使事情变得更加困难。

此外,您对 cache 运算符的看法是正确的,因为它不是关于缓存值本身,而是关于重播发生在其他订阅者身上的事情。我相信 cachereplay 运算符实际上是 Flux.

的同义词

实际上,您不必使用 CacheMono 来保存错误。

private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

...

Mono<MyResponseObject> myResponseObject =
        CacheMono.lookup(key -> Mono.justOrEmpty(myCaffeineCache.getIfPresent(key))
                .map(Signal::next), myRequestObject)
                .onCacheMissResume(() -> /* Your web client or other Mono here */)
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> myCaffeineCache.put(key, value))));

当您切换到外部缓存时,这可能会有用。不要忘记为外部缓存使用反应式客户端。