针对使用 jmslistener 注释的方法的模式验证原始消息

validate raw message against schema for method annotated with jmslistener

我需要对所有 jms 侦听器应用一些 pre-checks 和常见步骤,例如根据模式(JSON 模式)验证原始消息。例子-

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order) { ... }
}

现在,在 spring 将消息从 queue 转换为订单之前,我需要执行以下操作 -

  1. 使用 headers 将原始消息记录到自定义记录器中。
  2. 根据 json 模式验证 json 消息(文本消息)(为简单起见,假设我这里只有一个模式)
  3. 如果模式验证失败,记录错误并抛出异常
  4. 如果schema验证通过,则继续控制到spring做转换,然后继续process order方法。

springJMS 架构是否提供任何方式来注入上述需求? 我知道 AOP 闪过脑海,但我不确定它是否适用于@JmsListener。

一个相当简单的技术是在侦听器容器工厂上将 autoStartup 设置为 false

然后,使用 JmsListenerEndpointRegistry bean 获取侦听器容器。

然后getMessageListener(),将其包装在AOP代理中,然后setMessageListener()

然后启动容器。

可能有更优雅的方法,但我认为您必须进入侦听器创建代码的内部,这非常复杂。

编辑

Spring 引导示例:

@SpringBootApplication
public class So49682934Application {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(So49682934Application.class, args);
    }

    @JmsListener(id = "listener1", destination = "so49682934")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }

    @Bean
    public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
        return args -> {
            DefaultMessageListenerContainer container =
                    (DefaultMessageListenerContainer) registry.getListenerContainer("listener1");
            Object listener = container.getMessageListener();
            ProxyFactory pf = new ProxyFactory(listener);
            NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
            advisor.addMethodName("onMessage");
            pf.addAdvisor(advisor);
            container.setMessageListener(pf.getProxy());
            registry.start();
            Thread.sleep(5_000);
            Foo foo = new Foo("baz");
            template.convertAndSend("so49682934", foo);
        };
    }

    @Bean
    public MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("typeId");
        return converter;
    }

    public static class MyJmsInterceptor implements MethodInterceptor {

        private final Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Message message = (Message) invocation.getArguments()[0];
            logger.info(message.toString());
            // validate
            return invocation.proceed();
        }

    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

spring.jms.listener.auto-startup=false

m2018-04-06 11:42:04.859 INFO 59745 --- [enerContainer-1] e.So49682934Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-60138-1523029319662-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-60138-1523029319662-4:2:1:1, destination = queue://so49682934, transactionId = null, expiration = 0, timestamp = 1523029324849, arrival = 0, brokerInTime = 1523029324849, brokerOutTime = 1523029324853, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So49682934Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}

2018-04-06 11:42:04.882 INFO 59745 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$e29327b8 : Foo [bar=baz]

EDIT2

以下是如何通过基础设施来实现...

@SpringBootApplication
@EnableJms
public class So496829341Application {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(So496829341Application.class, args);
    }

    @JmsListener(id = "listen1", destination="so496829341")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5_000);
            template.convertAndSend("so496829341", new Foo("baz"));
        };
    }

    @Bean
    public MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("typeId");
        return converter;
    }

    @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    public static JmsListenerAnnotationBeanPostProcessor bpp() {
        return new JmsListenerAnnotationBeanPostProcessor() {

            @Override
            protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                return new MethodJmsListenerEndpoint() {

                    @Override
                    protected MessagingMessageListenerAdapter createMessageListener(
                            MessageListenerContainer container) {
                        MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                        ProxyFactory pf = new ProxyFactory(listener);
                        pf.setProxyTargetClass(true);
                        NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
                        advisor.addMethodName("onMessage");
                        pf.addAdvisor(advisor);
                        return (MessagingMessageListenerAdapter) pf.getProxy();
                    }

                };
            }

        };
    }

    public static class MyJmsInterceptor implements MethodInterceptor {

        private final Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Message message = (Message) invocation.getArguments()[0];
            logger.info(message.toString());
            // validate
            return invocation.proceed();
        }

    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

注意:BPP 必须是静态的,并且需要 @EnableJms,因为此 BPP 的存在会禁用引导。

2018-04-06 13:44:41.607 INFO 82669 --- [enerContainer-1] .So496829341Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-63685-1523036676402-4:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-63685-1523036676402-4:2:1:1, destination = queue://so496829341, transactionId = null, expiration = 0, timestamp = 1523036681598, arrival = 0, brokerInTime = 1523036681598, brokerOutTime = 1523036681602, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1050, properties = {typeId=com.example.So496829341Application$Foo}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}

2018-04-06 13:44:41.634 INFO 82669 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$ff4b13f : Foo [bar=baz]

EDIT3

避免 AOP...

@SpringBootApplication
@EnableJms
public class So496829341Application {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(So496829341Application.class, args);
    }

    @JmsListener(id = "listen1", destination="so496829341")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5_000);
            template.convertAndSend("so496829341", new Foo("baz"));
        };
    }

    @Bean
    public MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("typeId");
        return converter;
    }

    @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    public static JmsListenerAnnotationBeanPostProcessor bpp() {
        return new JmsListenerAnnotationBeanPostProcessor() {

            @Override
            protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                return new MethodJmsListenerEndpoint() {

                    @Override
                    protected MessagingMessageListenerAdapter createMessageListener(
                            MessageListenerContainer container) {
                        final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                        return new MessagingMessageListenerAdapter() {

                            @Override
                            public void onMessage(Message jmsMessage, Session session) throws JMSException {
                                logger.info(jmsMessage.toString());
                                // validate
                                listener.onMessage(jmsMessage, session);
                            }

                        };
                    }

                };
            }

        };
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

EDIT4

要访问侦听器方法上的其他注释,可以这样做,但是需要反射来获取对 Method...

的引用
@JmsListener(id = "listen1", destination="so496829341")
@Schema("foo.bar")
public void listen(Foo foo) {
    logger.info(foo.toString());
}

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Schema {

    String value();

}

@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
    return new JmsListenerAnnotationBeanPostProcessor() {

        @Override
        protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
            return new MethodJmsListenerEndpoint() {

                @Override
                protected MessagingMessageListenerAdapter createMessageListener(
                        MessageListenerContainer container) {
                    final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                    InvocableHandlerMethod handlerMethod =
                            (InvocableHandlerMethod) new DirectFieldAccessor(listener)
                                    .getPropertyValue("handlerMethod");
                    final Schema schema = AnnotationUtils.getAnnotation(handlerMethod.getMethod(), Schema.class);
                    return new MessagingMessageListenerAdapter() {

                        @Override
                        public void onMessage(Message jmsMessage, Session session) throws JMSException {
                            logger.info(jmsMessage.toString());
                            logger.info(schema.value());
                            // validate
                            listener.onMessage(jmsMessage, session);
                        }

                    };
                }

            };
        }

    };
}