Spring boot 2.0.2,使用 Aop 拦截 Cloud Stream 注释不再起作用
Spring boot 2.0.2, interception of Cloud Stream annotations with Aop not working anymore
我尽量让标题简单明了。
基本上,我需要拦截云流的@Input 和@Output 注释的使用。这是自动将特定 ChannelInterceptor 添加到每个 MessageChannel 所必需的。 (preSend 方法中的行为会根据消息的生产和消费略有不同)
例如,如果我声明这个建议
@Around("@annotation(org.springframework.cloud.stream.annotation.Input)")
public Object interceptInput(ProceedingJoinPoint joinPoint) throws Throwable {
LOG.debug("Intercepted @Input from method : {}", joinPoint.getSignature());
Object returnValue = null;
try {
returnValue = joinPoint.proceed();
ChannelInterceptorManager.addInputInterceptor((AbstractMessageChannel)returnValue);
} catch (Exception e) {
LOG.warn("@Input error", e);
}
return returnValue;
}
我声明这个例子class
@EnableBinding(Sink.class)
@Component
public class MyClass {
@StreamListener(Sink.INPUT)
public void handle(Object message) {
// preSend has been called before this method
}
}
这在 Spring Boot 2.0.1 上工作得很好,但在 Spring Boot 2.0.2 上却不行,我很难理解为什么。
我没有尝试过其他 Cloud stream 的注释,但基本的 Aop 工作正常。
请记住,这是要在 JAR 中使用的,因此我事先不知道将要使用的 classes 或频道名称,我需要它是自动的并且对开发者透明。
谢谢!
编辑:如果阅读这篇文章的人不熟悉云流,Sink 接口声明了一个用@Input 注释的方法,因此启用绑定就可以了。
不确定 boot 2.0.1 和 2.0.2 之间发生了什么,但上面的内容听起来像是做一些简单事情的相当复杂的方法。为什么不直接注册 BPP,您可以在初始化期间添加 pre/post 通道拦截器。
因此,BPP 并没有完全解决问题,因为我需要区分使用@Input 创建的MessageChannel 和使用@Output 创建的MessageChannel。 MessageChannel bean 不携带此信息。这就是为什么我首先使用 Aop 来分别拦截这两个注释。
洞察力:我也考虑过将@GlobalChannelInterceptor 与包含"input" 或"output" 的模式一起使用,但这意味着对最终用户强制执行此类模式。我将此解决方案保留为最后的手段,但我希望在使用罐子时此过程完全不可见且影响较小。这就是 AOP 派上用场的地方,但 2.0.2 的这种新行为对我来说肯定是有问题的。
编辑:因此版本更改的问题是 bean 初始化顺序,对于任何与 Spring boot 2.0.2 有类似问题的人。如果你能控制你需要的每一个bean,我建议你看看@DependsOn。
最终,我按照@Oleg Zhurakousky 的建议,通过使用 BeanPostProcessor 而不是 AOP 将输入与输出分开来解决了我的具体问题。
下面是一个工作方法:
@Autowired
private AbstractBeanFactory beanFactory;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AbstractMessageChannel) {
try {
RootBeanDefinition beanDefinition = (RootBeanDefinition) beanFactory.getMergedBeanDefinition(beanName);
Method method = beanDefinition.getResolvedFactoryMethod();
if (method != null) {
if (AnnotationUtils.findAnnotation(method, Input.class) != null) {
((AbstractMessageChannel)bean).addInterceptor(/*Your input ChannelInterceptor*/);
} else if (AnnotationUtils.findAnnotation(method, Output.class) != null) {
((AbstractMessageChannel)bean).addInterceptor(/*Your output ChannelInterceptor*/);
}
}
} catch (Exception e) {
// exception can be thrown by the bean factory
}
}
return bean;
}
我尽量让标题简单明了。
基本上,我需要拦截云流的@Input 和@Output 注释的使用。这是自动将特定 ChannelInterceptor 添加到每个 MessageChannel 所必需的。 (preSend 方法中的行为会根据消息的生产和消费略有不同)
例如,如果我声明这个建议
@Around("@annotation(org.springframework.cloud.stream.annotation.Input)")
public Object interceptInput(ProceedingJoinPoint joinPoint) throws Throwable {
LOG.debug("Intercepted @Input from method : {}", joinPoint.getSignature());
Object returnValue = null;
try {
returnValue = joinPoint.proceed();
ChannelInterceptorManager.addInputInterceptor((AbstractMessageChannel)returnValue);
} catch (Exception e) {
LOG.warn("@Input error", e);
}
return returnValue;
}
我声明这个例子class
@EnableBinding(Sink.class)
@Component
public class MyClass {
@StreamListener(Sink.INPUT)
public void handle(Object message) {
// preSend has been called before this method
}
}
这在 Spring Boot 2.0.1 上工作得很好,但在 Spring Boot 2.0.2 上却不行,我很难理解为什么。
我没有尝试过其他 Cloud stream 的注释,但基本的 Aop 工作正常。
请记住,这是要在 JAR 中使用的,因此我事先不知道将要使用的 classes 或频道名称,我需要它是自动的并且对开发者透明。
谢谢!
编辑:如果阅读这篇文章的人不熟悉云流,Sink 接口声明了一个用@Input 注释的方法,因此启用绑定就可以了。
不确定 boot 2.0.1 和 2.0.2 之间发生了什么,但上面的内容听起来像是做一些简单事情的相当复杂的方法。为什么不直接注册 BPP,您可以在初始化期间添加 pre/post 通道拦截器。
因此,BPP 并没有完全解决问题,因为我需要区分使用@Input 创建的MessageChannel 和使用@Output 创建的MessageChannel。 MessageChannel bean 不携带此信息。这就是为什么我首先使用 Aop 来分别拦截这两个注释。
洞察力:我也考虑过将@GlobalChannelInterceptor 与包含"input" 或"output" 的模式一起使用,但这意味着对最终用户强制执行此类模式。我将此解决方案保留为最后的手段,但我希望在使用罐子时此过程完全不可见且影响较小。这就是 AOP 派上用场的地方,但 2.0.2 的这种新行为对我来说肯定是有问题的。
编辑:因此版本更改的问题是 bean 初始化顺序,对于任何与 Spring boot 2.0.2 有类似问题的人。如果你能控制你需要的每一个bean,我建议你看看@DependsOn。
最终,我按照@Oleg Zhurakousky 的建议,通过使用 BeanPostProcessor 而不是 AOP 将输入与输出分开来解决了我的具体问题。 下面是一个工作方法:
@Autowired
private AbstractBeanFactory beanFactory;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AbstractMessageChannel) {
try {
RootBeanDefinition beanDefinition = (RootBeanDefinition) beanFactory.getMergedBeanDefinition(beanName);
Method method = beanDefinition.getResolvedFactoryMethod();
if (method != null) {
if (AnnotationUtils.findAnnotation(method, Input.class) != null) {
((AbstractMessageChannel)bean).addInterceptor(/*Your input ChannelInterceptor*/);
} else if (AnnotationUtils.findAnnotation(method, Output.class) != null) {
((AbstractMessageChannel)bean).addInterceptor(/*Your output ChannelInterceptor*/);
}
}
} catch (Exception e) {
// exception can be thrown by the bean factory
}
}
return bean;
}