缓存 Spring 5 WebFlux
Caching in Spring 5 WebFlux
在 Spring 5 中有什么方法可以缓存来自 WebClient 的 Flux 吗?我试过了,但没有缓存任何东西。
@RestController
@SpringBootApplication
@EnableCaching
public class GatewayApplication {
@PostMapping(value ="/test", produces = "application/json")
public Flux<String> handleRequest(@RequestBody String body) {
return getHspadQuery(body);
}
@Cacheable("testCache")
private Flux<String> getData (String body) {
return WebClient.create().post()
.uri("http://myurl")
.body(BodyInserters.fromObject(body))
.retrieve().bodyToFlux(String.class).cache();
}
}
当我提出第三个请求时,它永远不会完成。然后在后续请求中我得到响应,但服务器抛出以下内容:
2018-04-09 12:36:23.920 ERROR 11488 --- [ctor-http-nio-4] r.ipc.netty.channel.ChannelOperations : [HttpServer] Error processing connection. Requesting close the channel
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:646) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:523) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:897) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
...
而且它从不缓存任何东西。
如有任何帮助,我们将不胜感激。
谢谢。
目前,@Cacheable
不适用于 Flux
(以及一般的 Reactor)。
但是对于您的示例,每次调用该方法时,您都会创建一个新的 Flux
实例,因此它自然不会缓存任何内容。
为了能够缓存结果,您需要将 Flux
转换为列表实例,或者继续重复使用一个 Flux
实例
最后我用Mono解决了。我想使用 Flux 是可能的,例如使用 reduce。
@RestController
@SpringBootApplication
public class Application {
@Autowired
CacheManager manager;
private WebClient client;
@PostConstruct
public void setup() {
client = WebClient.builder()
.baseUrl("http://myurl")
.exchangeStrategies(ExchangeStrategies.withDefaults())
.build();
}
@Bean
public CacheManager cacheManager() {
SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("hspad")));
return cacheManager;
}
@PostMapping(value = "/hspad/grahql", produces = "application/json")
public Mono<ResponseEntity<String>> hspadService(@RequestBody String body) {
return getHspadQuery(body);
}
private Mono<ResponseEntity<String>> getHspadQuery (String body) {
Mono<ResponseEntity<String>> mono;
Optional<Cache.ValueWrapper> value = Optional.ofNullable(cacheManager().getCache("hspad").get(body));
if(value.isPresent()) {
mono = Mono.just(ResponseEntity.ok(value.get().get().toString()));
} else {
mono = client.post()
.body(BodyInserters.fromObject(body))
.retrieve().bodyToMono(String.class).map(response ->
{
// Care blocking operation! (use cacheManager -not found yet- prepared for reactive) cacheManager().getCache("hspad").putIfAbsent(body,response);
return ResponseEntity.ok(response);
});
}
return mono;
}
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
有一个反应器 cache add-on 可以与 Spring CacheManager 一起使用。
但是,正如已接受答案中的评论所指出的,目前,Spring 缓存 API(gets 和 puts)仍然处于阻塞状态。在解决这个问题之前,我们只能让程序完全响应。
这是 java 中的示例代码片段。
完整的示例项目是 here in github。
@Service
public class CatServiceImpl implements CatService {
private static final String CACHE_NAME = "sr";
private static final String KEY = "k";
@Autowired
private WebClient client;
@Autowired
private CacheManager cacheManager;
@SuppressWarnings("unchecked")
private Function<String, Mono<List<Signal<CatDto>>>> reader = k -> Mono
.justOrEmpty((Optional.ofNullable((List<CatDto>) (cacheManager.getCache(CACHE_NAME).get(k, List.class)))))
.flatMap(v -> Flux.fromIterable(v).materialize().collectList());
private BiFunction<String, List<Signal<CatDto>>, Mono<Void>> writer = (k, sigs) -> Flux.fromIterable(sigs)
.dematerialize().collectList().doOnNext(l -> cacheManager.getCache(CACHE_NAME).put(k, l)).then();
@Override
public Flux<CatDto> search() {
Flux<CatDto> fromServer = client.get().retrieve().bodyToFlux(CatDto.class);
return CacheFlux.lookup(reader, KEY).onCacheMissResume(fromServer).andWriteWith(writer);
}
}
我使用 reactor cache add-on 的注释和基于 aop 的 spring 反应器缓存的替代方法,直到 spring 框架的 @Cacheable 注释支持反应式缓存。
https://github.com/pkgonan/reactor-cache
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MonoCacheable {
String value() default "";
}
abstract class AbstractSpringCache<T> {
protected Cache cache;
protected Class<T> type;
protected AbstractSpringCache(Cache cache, Class<T> type) {
this.cache = cache;
this.type = type;
}
}
public class SpringMonoCache<T> extends AbstractSpringCache<T> implements MonoCache<T> {
public SpringMonoCache(Cache cache, Class<T> type) {
super(cache, type);
}
@Override
public Mono<T> find(Mono<T> retriever, String key) {
return CacheMono.lookup(reader, key)
.onCacheMissResume(retriever)
.andWriteWith(writer);
}
/** Mono Cache reader function **/
private Function<String, Mono<Signal<? extends T>>> reader = k -> Mono
.fromCallable(() -> cache.get(k, type))
.subscribeOn(Schedulers.elastic())
.flatMap(t -> Mono.justOrEmpty(Signal.next(t)));
/** Mono Cache writer function **/
private BiFunction<String, Signal<? extends T>, Mono<Void>> writer = (k, signal) -> Mono
.fromRunnable(() -> Optional.ofNullable(signal.get())
.ifPresent(o -> cache.put(k, o)))
.subscribeOn(Schedulers.elastic())
.then();
}
@Aspect
@Component
class ReactorAnnotationCacheAspect {
...
@Around("annotationOfAnyMonoCacheable() && " +
"executionOfAnyPublicMonoMethod()")
final Object around(final ProceedingJoinPoint joinPoint) throws Throwable {
...
try {
return reactorCacheAspectSupport.execute(aspectJInvoker, method, args);
} catch(...) {}...
return joinPoint.proceed(args);
}
@Pointcut(value = "@annotation(reactor.cache.spring.annotation.MonoCacheable)")
private void annotationOfAnyMonoCacheable() {}
@Pointcut(value = "execution(public reactor.core.publisher.Mono *(..))")
private void executionOfAnyPublicMonoMethod() {}
}
class ReactorCacheAspectSupport {
private final CacheManager cacheManager;
...
Object execute(final CacheOperationInvoker invoker, final Method method, final Object[] args) {
...
return execute(cache, invoker.invoke(), key, returnType);
}
private Object execute(final Cache cache, final Object proceed, final String key, final Class<?> type) {
...
final ReactorCache cacheResolver = getCacheResolver(cache, type);
return cacheResolver.find(proceed, key);
}
...
}
在 Spring 5 中有什么方法可以缓存来自 WebClient 的 Flux 吗?我试过了,但没有缓存任何东西。
@RestController
@SpringBootApplication
@EnableCaching
public class GatewayApplication {
@PostMapping(value ="/test", produces = "application/json")
public Flux<String> handleRequest(@RequestBody String body) {
return getHspadQuery(body);
}
@Cacheable("testCache")
private Flux<String> getData (String body) {
return WebClient.create().post()
.uri("http://myurl")
.body(BodyInserters.fromObject(body))
.retrieve().bodyToFlux(String.class).cache();
}
}
当我提出第三个请求时,它永远不会完成。然后在后续请求中我得到响应,但服务器抛出以下内容:
2018-04-09 12:36:23.920 ERROR 11488 --- [ctor-http-nio-4] r.ipc.netty.channel.ChannelOperations : [HttpServer] Error processing connection. Requesting close the channel
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:646) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:523) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:897) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
...
而且它从不缓存任何东西。
如有任何帮助,我们将不胜感激。
谢谢。
目前,@Cacheable
不适用于 Flux
(以及一般的 Reactor)。
但是对于您的示例,每次调用该方法时,您都会创建一个新的 Flux
实例,因此它自然不会缓存任何内容。
为了能够缓存结果,您需要将 Flux
转换为列表实例,或者继续重复使用一个 Flux
实例
最后我用Mono解决了。我想使用 Flux 是可能的,例如使用 reduce。
@RestController
@SpringBootApplication
public class Application {
@Autowired
CacheManager manager;
private WebClient client;
@PostConstruct
public void setup() {
client = WebClient.builder()
.baseUrl("http://myurl")
.exchangeStrategies(ExchangeStrategies.withDefaults())
.build();
}
@Bean
public CacheManager cacheManager() {
SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("hspad")));
return cacheManager;
}
@PostMapping(value = "/hspad/grahql", produces = "application/json")
public Mono<ResponseEntity<String>> hspadService(@RequestBody String body) {
return getHspadQuery(body);
}
private Mono<ResponseEntity<String>> getHspadQuery (String body) {
Mono<ResponseEntity<String>> mono;
Optional<Cache.ValueWrapper> value = Optional.ofNullable(cacheManager().getCache("hspad").get(body));
if(value.isPresent()) {
mono = Mono.just(ResponseEntity.ok(value.get().get().toString()));
} else {
mono = client.post()
.body(BodyInserters.fromObject(body))
.retrieve().bodyToMono(String.class).map(response ->
{
// Care blocking operation! (use cacheManager -not found yet- prepared for reactive) cacheManager().getCache("hspad").putIfAbsent(body,response);
return ResponseEntity.ok(response);
});
}
return mono;
}
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
有一个反应器 cache add-on 可以与 Spring CacheManager 一起使用。 但是,正如已接受答案中的评论所指出的,目前,Spring 缓存 API(gets 和 puts)仍然处于阻塞状态。在解决这个问题之前,我们只能让程序完全响应。
这是 java 中的示例代码片段。 完整的示例项目是 here in github。
@Service
public class CatServiceImpl implements CatService {
private static final String CACHE_NAME = "sr";
private static final String KEY = "k";
@Autowired
private WebClient client;
@Autowired
private CacheManager cacheManager;
@SuppressWarnings("unchecked")
private Function<String, Mono<List<Signal<CatDto>>>> reader = k -> Mono
.justOrEmpty((Optional.ofNullable((List<CatDto>) (cacheManager.getCache(CACHE_NAME).get(k, List.class)))))
.flatMap(v -> Flux.fromIterable(v).materialize().collectList());
private BiFunction<String, List<Signal<CatDto>>, Mono<Void>> writer = (k, sigs) -> Flux.fromIterable(sigs)
.dematerialize().collectList().doOnNext(l -> cacheManager.getCache(CACHE_NAME).put(k, l)).then();
@Override
public Flux<CatDto> search() {
Flux<CatDto> fromServer = client.get().retrieve().bodyToFlux(CatDto.class);
return CacheFlux.lookup(reader, KEY).onCacheMissResume(fromServer).andWriteWith(writer);
}
}
我使用 reactor cache add-on 的注释和基于 aop 的 spring 反应器缓存的替代方法,直到 spring 框架的 @Cacheable 注释支持反应式缓存。
https://github.com/pkgonan/reactor-cache
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MonoCacheable {
String value() default "";
}
abstract class AbstractSpringCache<T> {
protected Cache cache;
protected Class<T> type;
protected AbstractSpringCache(Cache cache, Class<T> type) {
this.cache = cache;
this.type = type;
}
}
public class SpringMonoCache<T> extends AbstractSpringCache<T> implements MonoCache<T> {
public SpringMonoCache(Cache cache, Class<T> type) {
super(cache, type);
}
@Override
public Mono<T> find(Mono<T> retriever, String key) {
return CacheMono.lookup(reader, key)
.onCacheMissResume(retriever)
.andWriteWith(writer);
}
/** Mono Cache reader function **/
private Function<String, Mono<Signal<? extends T>>> reader = k -> Mono
.fromCallable(() -> cache.get(k, type))
.subscribeOn(Schedulers.elastic())
.flatMap(t -> Mono.justOrEmpty(Signal.next(t)));
/** Mono Cache writer function **/
private BiFunction<String, Signal<? extends T>, Mono<Void>> writer = (k, signal) -> Mono
.fromRunnable(() -> Optional.ofNullable(signal.get())
.ifPresent(o -> cache.put(k, o)))
.subscribeOn(Schedulers.elastic())
.then();
}
@Aspect
@Component
class ReactorAnnotationCacheAspect {
...
@Around("annotationOfAnyMonoCacheable() && " +
"executionOfAnyPublicMonoMethod()")
final Object around(final ProceedingJoinPoint joinPoint) throws Throwable {
...
try {
return reactorCacheAspectSupport.execute(aspectJInvoker, method, args);
} catch(...) {}...
return joinPoint.proceed(args);
}
@Pointcut(value = "@annotation(reactor.cache.spring.annotation.MonoCacheable)")
private void annotationOfAnyMonoCacheable() {}
@Pointcut(value = "execution(public reactor.core.publisher.Mono *(..))")
private void executionOfAnyPublicMonoMethod() {}
}
class ReactorCacheAspectSupport {
private final CacheManager cacheManager;
...
Object execute(final CacheOperationInvoker invoker, final Method method, final Object[] args) {
...
return execute(cache, invoker.invoke(), key, returnType);
}
private Object execute(final Cache cache, final Object proceed, final String key, final Class<?> type) {
...
final ReactorCache cacheResolver = getCacheResolver(cache, type);
return cacheResolver.find(proceed, key);
}
...
}