如何在 Project Reactor 中缓存项目并避免 Cache Stampede?
How to cache items in Project Reactor and avoid a Cache Stampede?
如何执行 get/compute 非阻塞查找并避免缓存踩踏。
这里举个例子,不会踩踏,而是阻塞。
public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
Map<KEY, Signal<? extends VALUE>> cacheMap, KEY key, Mono<VALUE> mono) {
return Mono.defer(() -> Mono.just(cacheMap.computeIfAbsent(key, k ->
mono.materialize().block())).dematerialize());
}
这里有一个不会阻塞,但是可以踩踏的例子
public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(
Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key) {
return otherSupplier -> writer -> Mono.defer(() ->
reader.apply(key)
.switchIfEmpty(otherSupplier.get()
.materialize()
.flatMap(signal -> writer.apply(key, signal)
)
)
.dematerialize());
}
有没有踩不踩不挡的办法?仅在其自己的调度程序上订阅阻塞调用是否有意义?
要重新表述您的问题,您希望在允许异步执行计算的同时避免踩踏。理想情况下,这将使用 ConcurrentMap<K, Mono<V>>
和 computeIfAbsent
来完成,如果计算失败,它将丢弃该条目。
Caffeine 的 AsyncLoadingCache
通过使用 CompletableFuture<V>
作为值类型来提供这种类型的行为。您可以将阻塞函数重写为
public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
AsyncLoadingCache<KEY, VALUE> cache, KEY key, Mono<VALUE> mono) {
return Mono.defer(() -> Mono.fromFuture(cache.get(key, (k,e) ->
mono.subscribeOn(Schedulers.fromExecutor(e)).toFuture())));
}
从版本 2.6.x 开始,没有比 AsyncCache
更简单的听取反馈的方法了,它将出现在 2.7 版本中。这还将包括一个 ConcurrentMap<K, CompletableFuture<V>>
视图,它可以让您将您的方法概括为没有特定于提供者的接口。现在,您可以通过避免加载方法并使用 Caffeine.newBuilder().buildAsync(key -> null)
.
来模拟非加载缓存
如何执行 get/compute 非阻塞查找并避免缓存踩踏。
这里举个例子,不会踩踏,而是阻塞。
public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
Map<KEY, Signal<? extends VALUE>> cacheMap, KEY key, Mono<VALUE> mono) {
return Mono.defer(() -> Mono.just(cacheMap.computeIfAbsent(key, k ->
mono.materialize().block())).dematerialize());
}
这里有一个不会阻塞,但是可以踩踏的例子
public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(
Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key) {
return otherSupplier -> writer -> Mono.defer(() ->
reader.apply(key)
.switchIfEmpty(otherSupplier.get()
.materialize()
.flatMap(signal -> writer.apply(key, signal)
)
)
.dematerialize());
}
有没有踩不踩不挡的办法?仅在其自己的调度程序上订阅阻塞调用是否有意义?
要重新表述您的问题,您希望在允许异步执行计算的同时避免踩踏。理想情况下,这将使用 ConcurrentMap<K, Mono<V>>
和 computeIfAbsent
来完成,如果计算失败,它将丢弃该条目。
Caffeine 的 AsyncLoadingCache
通过使用 CompletableFuture<V>
作为值类型来提供这种类型的行为。您可以将阻塞函数重写为
public static <KEY, VALUE> Mono<VALUE> lookupAndWrite(
AsyncLoadingCache<KEY, VALUE> cache, KEY key, Mono<VALUE> mono) {
return Mono.defer(() -> Mono.fromFuture(cache.get(key, (k,e) ->
mono.subscribeOn(Schedulers.fromExecutor(e)).toFuture())));
}
从版本 2.6.x 开始,没有比 AsyncCache
更简单的听取反馈的方法了,它将出现在 2.7 版本中。这还将包括一个 ConcurrentMap<K, CompletableFuture<V>>
视图,它可以让您将您的方法概括为没有特定于提供者的接口。现在,您可以通过避免加载方法并使用 Caffeine.newBuilder().buildAsync(key -> null)
.