如何在 Project Reactor 中缓存项目并避免 Cache Stampede?

How to cache items in Project Reactor and avoid a Cache Stampede?

如何执行 get/compute 非阻塞查找并避免缓存踩踏。

这里举个例子,不会踩踏,而是阻塞。

public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
    Map<KEY, Signal<? extends VALUE>> cacheMap, KEY key, Mono<VALUE> mono) {
    return Mono.defer(() -> Mono.just(cacheMap.computeIfAbsent(key, k -> 
        mono.materialize().block())).dematerialize());
}

这里有一个不会阻塞,但是可以踩踏的例子

public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(
        Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key) {
    return otherSupplier -> writer -> Mono.defer(() ->
            reader.apply(key)
                    .switchIfEmpty(otherSupplier.get()
                            .materialize()
                            .flatMap(signal -> writer.apply(key, signal)
                            )
                    )
                    .dematerialize());
}

有没有踩不踩不挡的办法?仅在其自己的调度程序上订阅阻塞调用是否有意义?

要重新表述您的问题,您希望在允许异步执行计算的同时避免踩踏。理想情况下,这将使用 ConcurrentMap<K, Mono<V>>computeIfAbsent 来完成,如果计算失败,它将丢弃该条目。

Caffeine 的 AsyncLoadingCache 通过使用 CompletableFuture<V> 作为值类型来提供这种类型的行为。您可以将阻塞函数重写为

public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
    AsyncLoadingCache<KEY, VALUE> cache, KEY key, Mono<VALUE> mono) {
  return Mono.defer(() -> Mono.fromFuture(cache.get(key, (k,e) ->
            mono.subscribeOn(Schedulers.fromExecutor(e)).toFuture())));
}

从版本 2.6.x 开始,没有比 AsyncCache 更简单的听取反馈的方法了,它将出现在 2.7 版本中。这还将包括一个 ConcurrentMap<K, CompletableFuture<V>> 视图,它可以让您将您的方法概括为没有特定于提供者的接口。现在,您可以通过避免加载方法并使用 Caffeine.newBuilder().buildAsync(key -> null).

来模拟非加载缓存