在 Spring Webflux Mono 中缓存并行请求
Caching parallel request in Spring Webflux Mono
我们正在使用 spring webflux(项目反应器),作为要求的一部分,我们需要从我们的服务器调用一个 API。
对于 API 调用,我们需要缓存响应。所以我们使用 Mono.cache
运算符。
它缓存响应 Mono<ResponseDto>
并且下次发生相同的 API 调用时,它将从缓存中获取它。以下是示例实现
public Mono<ResponseDto> getResponse() {
if (res == null) {
res =
fetchResponse()
.onErrorMap(Exception.class, (error) -> new CustomException())
.cache(
r -> Duration.ofSeconds(r.expiresIn()),
error -> Duration.ZERO,
() -> Duration.ZERO);
}
return res;
}
问题是如果服务器同时调用相同的 API 调用两次(例如 Mono.zip
),那么响应不会被缓存,我们实际上会调用它两次。
这个问题有现成的解决方案吗?除了缓存 Response,我们是否可以缓存 Mono 本身,以便两个请求都订阅同一个 Mono,因此两个请求都在单个 API 调用响应之后执行?
它也应该适用于顺序执行 - 恐怕如果我们缓存 Mono,那么一旦请求完成,订阅就结束了,没有其他进程可以订阅它。
您可以在构造函数中初始化 Mono
(假设它不依赖于任何请求时间参数)。使用 cache
运算符将阻止对源的多个订阅。
class MyService {
private final Mono<ResponseBodyDto> response;
public MyService() {
response = fetchResponse()
.onErrorMap(Exception.class, (error) -> new CustomException())
.cache(
r -> Duration.ofSeconds(r.expiresIn()),
error -> Duration.ZERO,
() -> Duration.ZERO);
}
public Mono<ResponseDto> getResponse() {
return response;
}
}
如果对请求时间参数有依赖性,您应该考虑一些自定义缓存解决方案。
您可以使用 io.projectreactor.addons:reactor-extra
中的 CacheMono
来包装 non-reactive 缓存实现,例如 Guava Cache 或简单的 ConcurrentHashMap
。它不提供“exactly-once”保证,并行请求可能会导致缓存未命中,但在许多情况下,这应该不是问题。
这里有一个 Guava 缓存的例子
public class GlobalSettingsCache {
private final GlobalSettingsClient globalSettingsClient;
private final Cache<String, GlobalSettings> cache;
public GlobalSettingsCache(GlobalSettingsClient globalSettingsClient, Duration cacheTtl) {
this.globalSettingsClient = globalSettingsClient;
this.cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheTtl)
.build();
}
public Mono<GlobalSettings> get(String tenant) {
return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
.onCacheMissResume(() -> fetchGlobalSettings(tenant))
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> cache.put(key, value))));
}
private Mono<GlobalSettings> fetchGlobalSettings(String tenant) {
return globalSettingsClient.getGlobalSettings(tenant);
}
}
Project Reactor 提供了一个缓存实用程序 CacheMono,它是 non-blocking 但可以踩踏。
AsyncCache 将是更好的集成,因为第一次使用键“K”查找将导致缓存未命中,它将 return API 调用的 CompletableFuture 和第二次查找相同的键“K”将获得相同的 CompletableFuture 对象。
returned 的未来对象可以转换为 to/from Mono 和 Mono.fromFuture()
public Mono<ResponseData> lookupAndWrite(AsyncCache<String, ResponseData> cache, String key) {
return Mono.defer(
() ->
Mono.fromFuture(
cache.get(
key,
(searchKey, executor) -> {
CompletableFuture<ResponseData> future = callAPI(searchKey).toFuture();
return future.whenComplete(
(r, t) -> {
if (t != null) {
cache.synchronous().invalidate(key);
}
});
})));}
我们正在使用 spring webflux(项目反应器),作为要求的一部分,我们需要从我们的服务器调用一个 API。
对于 API 调用,我们需要缓存响应。所以我们使用 Mono.cache
运算符。
它缓存响应 Mono<ResponseDto>
并且下次发生相同的 API 调用时,它将从缓存中获取它。以下是示例实现
public Mono<ResponseDto> getResponse() {
if (res == null) {
res =
fetchResponse()
.onErrorMap(Exception.class, (error) -> new CustomException())
.cache(
r -> Duration.ofSeconds(r.expiresIn()),
error -> Duration.ZERO,
() -> Duration.ZERO);
}
return res;
}
问题是如果服务器同时调用相同的 API 调用两次(例如 Mono.zip
),那么响应不会被缓存,我们实际上会调用它两次。
这个问题有现成的解决方案吗?除了缓存 Response,我们是否可以缓存 Mono 本身,以便两个请求都订阅同一个 Mono,因此两个请求都在单个 API 调用响应之后执行?
它也应该适用于顺序执行 - 恐怕如果我们缓存 Mono,那么一旦请求完成,订阅就结束了,没有其他进程可以订阅它。
您可以在构造函数中初始化 Mono
(假设它不依赖于任何请求时间参数)。使用 cache
运算符将阻止对源的多个订阅。
class MyService {
private final Mono<ResponseBodyDto> response;
public MyService() {
response = fetchResponse()
.onErrorMap(Exception.class, (error) -> new CustomException())
.cache(
r -> Duration.ofSeconds(r.expiresIn()),
error -> Duration.ZERO,
() -> Duration.ZERO);
}
public Mono<ResponseDto> getResponse() {
return response;
}
}
如果对请求时间参数有依赖性,您应该考虑一些自定义缓存解决方案。
您可以使用 io.projectreactor.addons:reactor-extra
中的 CacheMono
来包装 non-reactive 缓存实现,例如 Guava Cache 或简单的 ConcurrentHashMap
。它不提供“exactly-once”保证,并行请求可能会导致缓存未命中,但在许多情况下,这应该不是问题。
这里有一个 Guava 缓存的例子
public class GlobalSettingsCache {
private final GlobalSettingsClient globalSettingsClient;
private final Cache<String, GlobalSettings> cache;
public GlobalSettingsCache(GlobalSettingsClient globalSettingsClient, Duration cacheTtl) {
this.globalSettingsClient = globalSettingsClient;
this.cache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheTtl)
.build();
}
public Mono<GlobalSettings> get(String tenant) {
return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
.onCacheMissResume(() -> fetchGlobalSettings(tenant))
.andWriteWith((key, signal) -> Mono.fromRunnable(() ->
Optional.ofNullable(signal.get())
.ifPresent(value -> cache.put(key, value))));
}
private Mono<GlobalSettings> fetchGlobalSettings(String tenant) {
return globalSettingsClient.getGlobalSettings(tenant);
}
}
Project Reactor 提供了一个缓存实用程序 CacheMono,它是 non-blocking 但可以踩踏。
AsyncCache 将是更好的集成,因为第一次使用键“K”查找将导致缓存未命中,它将 return API 调用的 CompletableFuture 和第二次查找相同的键“K”将获得相同的 CompletableFuture 对象。
returned 的未来对象可以转换为 to/from Mono 和 Mono.fromFuture()
public Mono<ResponseData> lookupAndWrite(AsyncCache<String, ResponseData> cache, String key) {
return Mono.defer(
() ->
Mono.fromFuture(
cache.get(
key,
(searchKey, executor) -> {
CompletableFuture<ResponseData> future = callAPI(searchKey).toFuture();
return future.whenComplete(
(r, t) -> {
if (t != null) {
cache.synchronous().invalidate(key);
}
});
})));}