为响应式管道编写方面
Writing Aspects For Reactive Pipelines
我正在为 return 承诺的方法编写方面。考虑以下方法:
public Mono<Stream> publishToKafka(Stream s) {
//publishToKafka is asynchronous
return Mono.just(s).flatMap(worker::publishToKafka);
}
我想缓存发布是否成功。因为这是一个横切关注点,所以 Aspect 看起来是最好的设计。这是我的看点。
@Around("@annotation....")
public Object cache() {
//get the data to cache from the annotation
Object result = pjp.proceed();
cache.cache("key","data");
return result;
}
现在由于 publishToKafka
是异步的,目标方法 return 会在线程切换发生并调用 cache.cache()
时立即执行。这不是我想要的。我想要的是,如果事件已成功发布到 Kafka,则应该缓存结果。以下建议有效。
@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
//get the data to cache from the annotation
return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}
我想了解这里发生了什么。这是否发生在管道的组装期间?或者在我的建议添加 doOnNext
运算符的执行时间(pjp.proceed()
return 是一个承诺)?
我需要了解此示例上下文中的汇编时间与执行时间。
Spring AOP 和 AspectJ 方面总是在与拦截的连接点相同的线程中同步执行。因此,如果您立即截获的方法 returns 和 return 值类似于承诺、未来或无(无效)与回调相结合,您不能期望神奇地获得异步结果方面的建议。您确实需要让方面知道异步情况。
说到这里,我还要提一下,我以前从未使用过响应式编程,我只知道概念。从我在你的建议中看到的情况来看,解决方案应该可行,但有一件事不太好:你提出建议 return a new Mono
instance returned by your doOnNext(..)
call。也许 return original Mono
你在注册缓存回调后从 proceed()
得到的 return 会更干净,只是为了避免任何副作用。
不知道还要解释什么,情况已经很清楚了。如果我的解释不够充分,请随时提出直接相关的后续问题。
我正在为 return 承诺的方法编写方面。考虑以下方法:
public Mono<Stream> publishToKafka(Stream s) {
//publishToKafka is asynchronous
return Mono.just(s).flatMap(worker::publishToKafka);
}
我想缓存发布是否成功。因为这是一个横切关注点,所以 Aspect 看起来是最好的设计。这是我的看点。
@Around("@annotation....")
public Object cache() {
//get the data to cache from the annotation
Object result = pjp.proceed();
cache.cache("key","data");
return result;
}
现在由于 publishToKafka
是异步的,目标方法 return 会在线程切换发生并调用 cache.cache()
时立即执行。这不是我想要的。我想要的是,如果事件已成功发布到 Kafka,则应该缓存结果。以下建议有效。
@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
//get the data to cache from the annotation
return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}
我想了解这里发生了什么。这是否发生在管道的组装期间?或者在我的建议添加 doOnNext
运算符的执行时间(pjp.proceed()
return 是一个承诺)?
我需要了解此示例上下文中的汇编时间与执行时间。
Spring AOP 和 AspectJ 方面总是在与拦截的连接点相同的线程中同步执行。因此,如果您立即截获的方法 returns 和 return 值类似于承诺、未来或无(无效)与回调相结合,您不能期望神奇地获得异步结果方面的建议。您确实需要让方面知道异步情况。
说到这里,我还要提一下,我以前从未使用过响应式编程,我只知道概念。从我在你的建议中看到的情况来看,解决方案应该可行,但有一件事不太好:你提出建议 return a new Mono
instance returned by your doOnNext(..)
call。也许 return original Mono
你在注册缓存回调后从 proceed()
得到的 return 会更干净,只是为了避免任何副作用。
不知道还要解释什么,情况已经很清楚了。如果我的解释不够充分,请随时提出直接相关的后续问题。