在 Spring Webflux Mono 中缓存并行请求

Caching parallel request in Spring Webflux Mono

我们正在使用 spring webflux(项目反应器),作为要求的一部分,我们需要从我们的服务器调用一个 API。

对于 API 调用,我们需要缓存响应。所以我们使用 Mono.cache 运算符。

它缓存响应 Mono<ResponseDto> 并且下次发生相同的 API 调用时,它将从缓存中获取它。以下是示例实现

public Mono<ResponseDto> getResponse() {
    if (res == null) {
      res =
          fetchResponse()
              .onErrorMap(Exception.class, (error) -> new CustomException())
              .cache(
                  r -> Duration.ofSeconds(r.expiresIn()),
                  error -> Duration.ZERO,
                  () -> Duration.ZERO);
    }
    return res;
  }

问题是如果服务器同时调用相同的 API 调用两次(例如 Mono.zip),那么响应不会被缓存,我们实际上会调用它两次。

这个问题有现成的解决方案吗?除了缓存 Response,我们是否可以缓存 Mono 本身,以便两个请求都订阅同一个 Mono,因此两个请求都在单个 API 调用响应之后执行?

它也应该适用于顺序执行 - 恐怕如果我们缓存 Mono,那么一旦请求完成,订阅就结束了,没有其他进程可以订阅它。

您可以在构造函数中初始化 Mono(假设它不依赖于任何请求时间参数)。使用 cache 运算符将阻止对源的多个订阅。

class MyService {
    private final Mono<ResponseBodyDto> response;

    public MyService() {
        response = fetchResponse()
            .onErrorMap(Exception.class, (error) -> new CustomException())
            .cache(
                r -> Duration.ofSeconds(r.expiresIn()),
                error -> Duration.ZERO,
                () -> Duration.ZERO);
    }

    public Mono<ResponseDto> getResponse() {
        return response;
    }
}

如果对请求时间参数有依赖性,您应该考虑一些自定义缓存解决方案。

您可以使用 io.projectreactor.addons:reactor-extra 中的 CacheMono 来包装 non-reactive 缓存实现,例如 Guava Cache 或简单的 ConcurrentHashMap。它不提供“exactly-once”保证,并行请求可能会导致缓存未命中,但在许多情况下,这应该不是问题。

这里有一个 Guava 缓存的例子

public class GlobalSettingsCache {
    private final GlobalSettingsClient globalSettingsClient;
    private final Cache<String, GlobalSettings> cache;

    public GlobalSettingsCache(GlobalSettingsClient globalSettingsClient, Duration cacheTtl) {
        this.globalSettingsClient = globalSettingsClient;
        this.cache = CacheBuilder.newBuilder()
                .expireAfterWrite(cacheTtl)
                .build();
    }

    public Mono<GlobalSettings> get(String tenant) {
        return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
                .onCacheMissResume(() -> fetchGlobalSettings(tenant))
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> cache.put(key, value))));
    }

    private Mono<GlobalSettings> fetchGlobalSettings(String tenant) {
        return globalSettingsClient.getGlobalSettings(tenant);
    }
}

Project Reactor 提供了一个缓存实用程序 CacheMono,它是 non-blocking 但可以踩踏。

AsyncCache 将是更好的集成,因为第一次使用键“K”查找将导致缓存未命中,它将 return API 调用的 CompletableFuture 和第二次查找相同的键“K”将获得相同的 CompletableFuture 对象。

returned 的未来对象可以转换为 to/from Mono 和 Mono.fromFuture()

 public Mono<ResponseData> lookupAndWrite(AsyncCache<String, ResponseData> cache, String key) {
return Mono.defer(
    () ->
        Mono.fromFuture(
            cache.get(
                key,
                (searchKey, executor) -> {
                  CompletableFuture<ResponseData> future = callAPI(searchKey).toFuture();
                  return future.whenComplete(
                      (r, t) -> {
                        if (t != null) {
                          cache.synchronous().invalidate(key);
                        }
                      });
                })));}