Spring Webflux 和@Cacheable - 缓存 Mono / Flux 类型结果的正确方法
Spring Webflux and @Cacheable - proper way of caching result of Mono / Flux type
我正在学习 Spring WebFlux,在编写示例应用程序期间,我发现了一个与反应类型 (Mono/Flux) 结合 Spring 缓存相关的问题。
考虑以下代码片段(在 Kotlin 中):
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>
@Service
class TaskService(val taskRepository: TaskRepository) {
@Cacheable("tasks")
fun get(id: String): Mono<Task> = taskRepository.findById(id)
}
这种缓存返回 Mono 或 Flux 的方法调用的方法是否有效且安全?也许还有其他一些原则可以做到这一点?
以下代码可与 SimpleCacheResolver 一起使用,但默认情况下无法与 Redis 一起使用,因为 Mono 不可序列化。为了使它们工作,例如需要使用 Kryo 序列化程序。
破解方式
目前,@Cacheable
与 Reactor 3 没有流畅的集成。
但是,您可以通过将 .cache()
运算符添加到 returned Mono
来绕过那个东西
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>
@Service
class TaskService(val taskRepository: TaskRepository) {
@Cacheable("tasks")
fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
}
hack 缓存和共享 return 从 taskRepository
数据中编辑。反过来,spring 可缓存将缓存 returned Mono
的引用,然后将 return 该引用。换句话说,它是一个缓存 mono 的缓存 :).
Reactor 插件方式
有一个 addition to Reactor 3 which allows fluent integration with modern in-memory caches like caffeine, jcache,等等。使用该技术,您将能够轻松地缓存您的数据:
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>
@Service
class TaskService(val taskRepository: TaskRepository) {
@Autowire
CacheManager manager;
fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
.onCacheMissResume(() -> taskRepository.findById(id))
.andWriteWith(writer());
fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
}
Note: Reactor addons caching own abstraction which is Signal<T>
, so, do not worry about that and following that convention
我用过 Oleh Dokuka 的 hacky 解决方案,效果很好,但有一个问题。您必须在 Flux 缓存中使用比 Cachable 缓存 timetolive 值更大的持续时间。如果您不为 Flux 缓存使用持续时间,它不会使其失效(Flux 文档说 "Turn this Flux into a hot source and cache last emitted signals for further Subscriber.")。
所以使 Flux 缓存 2 分钟和 timetolive 30 秒可以是有效的配置。如果首先发生 ehcahce 超时,则会生成一个新的 Flux 缓存引用并将使用它。
// 在门面中:
public Mono<HybrisResponse> getProducts(HybrisRequest request) {
return Mono.just(HybrisResponse.builder().build());
}
// 在服务层:
@Cacheable(cacheNames = "embarkations")
public HybrisResponse cacheable(HybrisRequest request) {
LOGGER.info("executing cacheable");
return null;
}
@CachePut(cacheNames = "embarkations")
public HybrisResponse cachePut(HybrisRequest request) {
LOGGER.info("executing cachePut");
return hybrisFacade.getProducts(request).block();
}
// 在控制器中:
HybrisResponse hybrisResponse = null;
try {
// get from cache
hybrisResponse = productFeederService.cacheable(request);
} catch (Throwable e) {
// if not in cache then cache it
hybrisResponse = productFeederService.cachePut(request);
}
return Mono.just(hybrisResponse)
.map(result -> ResponseBody.<HybrisResponse>builder()
.payload(result).build())
.map(ResponseEntity::ok);
我正在学习 Spring WebFlux,在编写示例应用程序期间,我发现了一个与反应类型 (Mono/Flux) 结合 Spring 缓存相关的问题。
考虑以下代码片段(在 Kotlin 中):
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>
@Service
class TaskService(val taskRepository: TaskRepository) {
@Cacheable("tasks")
fun get(id: String): Mono<Task> = taskRepository.findById(id)
}
这种缓存返回 Mono 或 Flux 的方法调用的方法是否有效且安全?也许还有其他一些原则可以做到这一点?
以下代码可与 SimpleCacheResolver 一起使用,但默认情况下无法与 Redis 一起使用,因为 Mono 不可序列化。为了使它们工作,例如需要使用 Kryo 序列化程序。
破解方式
目前,@Cacheable
与 Reactor 3 没有流畅的集成。
但是,您可以通过将 .cache()
运算符添加到 returned Mono
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>
@Service
class TaskService(val taskRepository: TaskRepository) {
@Cacheable("tasks")
fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
}
hack 缓存和共享 return 从 taskRepository
数据中编辑。反过来,spring 可缓存将缓存 returned Mono
的引用,然后将 return 该引用。换句话说,它是一个缓存 mono 的缓存 :).
Reactor 插件方式
有一个 addition to Reactor 3 which allows fluent integration with modern in-memory caches like caffeine, jcache,等等。使用该技术,您将能够轻松地缓存您的数据:
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>
@Service
class TaskService(val taskRepository: TaskRepository) {
@Autowire
CacheManager manager;
fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
.onCacheMissResume(() -> taskRepository.findById(id))
.andWriteWith(writer());
fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
}
Note: Reactor addons caching own abstraction which is
Signal<T>
, so, do not worry about that and following that convention
我用过 Oleh Dokuka 的 hacky 解决方案,效果很好,但有一个问题。您必须在 Flux 缓存中使用比 Cachable 缓存 timetolive 值更大的持续时间。如果您不为 Flux 缓存使用持续时间,它不会使其失效(Flux 文档说 "Turn this Flux into a hot source and cache last emitted signals for further Subscriber.")。 所以使 Flux 缓存 2 分钟和 timetolive 30 秒可以是有效的配置。如果首先发生 ehcahce 超时,则会生成一个新的 Flux 缓存引用并将使用它。
// 在门面中:
public Mono<HybrisResponse> getProducts(HybrisRequest request) {
return Mono.just(HybrisResponse.builder().build());
}
// 在服务层:
@Cacheable(cacheNames = "embarkations")
public HybrisResponse cacheable(HybrisRequest request) {
LOGGER.info("executing cacheable");
return null;
}
@CachePut(cacheNames = "embarkations")
public HybrisResponse cachePut(HybrisRequest request) {
LOGGER.info("executing cachePut");
return hybrisFacade.getProducts(request).block();
}
// 在控制器中:
HybrisResponse hybrisResponse = null;
try {
// get from cache
hybrisResponse = productFeederService.cacheable(request);
} catch (Throwable e) {
// if not in cache then cache it
hybrisResponse = productFeederService.cachePut(request);
}
return Mono.just(hybrisResponse)
.map(result -> ResponseBody.<HybrisResponse>builder()
.payload(result).build())
.map(ResponseEntity::ok);