针对使用 jmslistener 注释的方法的模式验证原始消息
validate raw message against schema for method annotated with jmslistener
我需要对所有 jms 侦听器应用一些 pre-checks 和常见步骤,例如根据模式(JSON 模式)验证原始消息。例子-
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order) { ... }
}
现在,在 spring 将消息从 queue 转换为订单之前,我需要执行以下操作 -
- 使用 headers 将原始消息记录到自定义记录器中。
- 根据 json 模式验证 json 消息(文本消息)(为简单起见,假设我这里只有一个模式)
- 如果模式验证失败,记录错误并抛出异常
- 如果schema验证通过,则继续控制到spring做转换,然后继续process order方法。
springJMS 架构是否提供任何方式来注入上述需求?
我知道 AOP 闪过脑海,但我不确定它是否适用于@JmsListener。
一个相当简单的技术是在侦听器容器工厂上将 autoStartup
设置为 false
。
然后,使用 JmsListenerEndpointRegistry
bean 获取侦听器容器。
然后getMessageListener()
,将其包装在AOP代理中,然后setMessageListener()
。
然后启动容器。
可能有更优雅的方法,但我认为您必须进入侦听器创建代码的内部,这非常复杂。
编辑
Spring 引导示例:
@SpringBootApplication
public class So49682934Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So49682934Application.class, args);
}
@JmsListener(id = "listener1", destination = "so49682934")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
return args -> {
DefaultMessageListenerContainer container =
(DefaultMessageListenerContainer) registry.getListenerContainer("listener1");
Object listener = container.getMessageListener();
ProxyFactory pf = new ProxyFactory(listener);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
advisor.addMethodName("onMessage");
pf.addAdvisor(advisor);
container.setMessageListener(pf.getProxy());
registry.start();
Thread.sleep(5_000);
Foo foo = new Foo("baz");
template.convertAndSend("so49682934", foo);
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
public static class MyJmsInterceptor implements MethodInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[0];
logger.info(message.toString());
// validate
return invocation.proceed();
}
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring.jms.listener.auto-startup=false
和
m2018-04-06 11:42:04.859 INFO 59745 --- [enerContainer-1] e.So49682934Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-60138-1523029319662-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-60138-1523029319662-4:2:1:1, destination = queue://so49682934, transactionId = null, expiration = 0, timestamp = 1523029324849, arrival = 0, brokerInTime = 1523029324849, brokerOutTime = 1523029324853, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So49682934Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}
2018-04-06 11:42:04.882 INFO 59745 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$e29327b8 : Foo [bar=baz]
EDIT2
以下是如何通过基础设施来实现...
@SpringBootApplication
@EnableJms
public class So496829341Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So496829341Application.class, args);
}
@JmsListener(id = "listen1", destination="so496829341")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5_000);
template.convertAndSend("so496829341", new Foo("baz"));
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
MessagingMessageListenerAdapter listener = super.createMessageListener(container);
ProxyFactory pf = new ProxyFactory(listener);
pf.setProxyTargetClass(true);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
advisor.addMethodName("onMessage");
pf.addAdvisor(advisor);
return (MessagingMessageListenerAdapter) pf.getProxy();
}
};
}
};
}
public static class MyJmsInterceptor implements MethodInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[0];
logger.info(message.toString());
// validate
return invocation.proceed();
}
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
注意:BPP 必须是静态的,并且需要 @EnableJms
,因为此 BPP 的存在会禁用引导。
2018-04-06 13:44:41.607 INFO 82669 --- [enerContainer-1] .So496829341Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-63685-1523036676402-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-63685-1523036676402-4:2:1:1, destination = queue://so496829341, transactionId = null, expiration = 0, timestamp = 1523036681598, arrival = 0, brokerInTime = 1523036681598, brokerOutTime = 1523036681602, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So496829341Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}
2018-04-06 13:44:41.634 INFO 82669 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$ff4b13f : Foo [bar=baz]
EDIT3
避免 AOP...
@SpringBootApplication
@EnableJms
public class So496829341Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So496829341Application.class, args);
}
@JmsListener(id = "listen1", destination="so496829341")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5_000);
template.convertAndSend("so496829341", new Foo("baz"));
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
return new MessagingMessageListenerAdapter() {
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
logger.info(jmsMessage.toString());
// validate
listener.onMessage(jmsMessage, session);
}
};
}
};
}
};
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
EDIT4
要访问侦听器方法上的其他注释,可以这样做,但是需要反射来获取对 Method
...
的引用
@JmsListener(id = "listen1", destination="so496829341")
@Schema("foo.bar")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Schema {
String value();
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
InvocableHandlerMethod handlerMethod =
(InvocableHandlerMethod) new DirectFieldAccessor(listener)
.getPropertyValue("handlerMethod");
final Schema schema = AnnotationUtils.getAnnotation(handlerMethod.getMethod(), Schema.class);
return new MessagingMessageListenerAdapter() {
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
logger.info(jmsMessage.toString());
logger.info(schema.value());
// validate
listener.onMessage(jmsMessage, session);
}
};
}
};
}
};
}
我需要对所有 jms 侦听器应用一些 pre-checks 和常见步骤,例如根据模式(JSON 模式)验证原始消息。例子-
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order) { ... }
}
现在,在 spring 将消息从 queue 转换为订单之前,我需要执行以下操作 -
- 使用 headers 将原始消息记录到自定义记录器中。
- 根据 json 模式验证 json 消息(文本消息)(为简单起见,假设我这里只有一个模式)
- 如果模式验证失败,记录错误并抛出异常
- 如果schema验证通过,则继续控制到spring做转换,然后继续process order方法。
springJMS 架构是否提供任何方式来注入上述需求? 我知道 AOP 闪过脑海,但我不确定它是否适用于@JmsListener。
一个相当简单的技术是在侦听器容器工厂上将 autoStartup
设置为 false
。
然后,使用 JmsListenerEndpointRegistry
bean 获取侦听器容器。
然后getMessageListener()
,将其包装在AOP代理中,然后setMessageListener()
。
然后启动容器。
可能有更优雅的方法,但我认为您必须进入侦听器创建代码的内部,这非常复杂。
编辑
Spring 引导示例:
@SpringBootApplication
public class So49682934Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So49682934Application.class, args);
}
@JmsListener(id = "listener1", destination = "so49682934")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
return args -> {
DefaultMessageListenerContainer container =
(DefaultMessageListenerContainer) registry.getListenerContainer("listener1");
Object listener = container.getMessageListener();
ProxyFactory pf = new ProxyFactory(listener);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
advisor.addMethodName("onMessage");
pf.addAdvisor(advisor);
container.setMessageListener(pf.getProxy());
registry.start();
Thread.sleep(5_000);
Foo foo = new Foo("baz");
template.convertAndSend("so49682934", foo);
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
public static class MyJmsInterceptor implements MethodInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[0];
logger.info(message.toString());
// validate
return invocation.proceed();
}
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring.jms.listener.auto-startup=false
和
m2018-04-06 11:42:04.859 INFO 59745 --- [enerContainer-1] e.So49682934Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-60138-1523029319662-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-60138-1523029319662-4:2:1:1, destination = queue://so49682934, transactionId = null, expiration = 0, timestamp = 1523029324849, arrival = 0, brokerInTime = 1523029324849, brokerOutTime = 1523029324853, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So49682934Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}
2018-04-06 11:42:04.882 INFO 59745 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$e29327b8 : Foo [bar=baz]
EDIT2
以下是如何通过基础设施来实现...
@SpringBootApplication
@EnableJms
public class So496829341Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So496829341Application.class, args);
}
@JmsListener(id = "listen1", destination="so496829341")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5_000);
template.convertAndSend("so496829341", new Foo("baz"));
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
MessagingMessageListenerAdapter listener = super.createMessageListener(container);
ProxyFactory pf = new ProxyFactory(listener);
pf.setProxyTargetClass(true);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
advisor.addMethodName("onMessage");
pf.addAdvisor(advisor);
return (MessagingMessageListenerAdapter) pf.getProxy();
}
};
}
};
}
public static class MyJmsInterceptor implements MethodInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[0];
logger.info(message.toString());
// validate
return invocation.proceed();
}
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
注意:BPP 必须是静态的,并且需要 @EnableJms
,因为此 BPP 的存在会禁用引导。
2018-04-06 13:44:41.607 INFO 82669 --- [enerContainer-1] .So496829341Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-63685-1523036676402-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-63685-1523036676402-4:2:1:1, destination = queue://so496829341, transactionId = null, expiration = 0, timestamp = 1523036681598, arrival = 0, brokerInTime = 1523036681598, brokerOutTime = 1523036681602, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So496829341Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}
2018-04-06 13:44:41.634 INFO 82669 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$ff4b13f : Foo [bar=baz]
EDIT3
避免 AOP...
@SpringBootApplication
@EnableJms
public class So496829341Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So496829341Application.class, args);
}
@JmsListener(id = "listen1", destination="so496829341")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5_000);
template.convertAndSend("so496829341", new Foo("baz"));
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
return new MessagingMessageListenerAdapter() {
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
logger.info(jmsMessage.toString());
// validate
listener.onMessage(jmsMessage, session);
}
};
}
};
}
};
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
EDIT4
要访问侦听器方法上的其他注释,可以这样做,但是需要反射来获取对 Method
...
@JmsListener(id = "listen1", destination="so496829341")
@Schema("foo.bar")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Schema {
String value();
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
InvocableHandlerMethod handlerMethod =
(InvocableHandlerMethod) new DirectFieldAccessor(listener)
.getPropertyValue("handlerMethod");
final Schema schema = AnnotationUtils.getAnnotation(handlerMethod.getMethod(), Schema.class);
return new MessagingMessageListenerAdapter() {
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
logger.info(jmsMessage.toString());
logger.info(schema.value());
// validate
listener.onMessage(jmsMessage, session);
}
};
}
};
}
};
}