Spring JMS 事务回滚 - 消息从 ActiveMQ 出队

Spring JMS transaction rollback - message dequeued off ActiveMQ

我有一个简单的 Spring 引导应用程序(Spring 引导版本 1.5。3.RELEASE)用于使用 ActiveMQ(版本 5.14.5)队列中的 JMS 消息。

我希望消息在 JMS 事务中使用。如果在消息消费过程中出现异常,我希望事务能够回滚并且消息不会出队(从消息队列中取出)。我可以在 Spring 日志中看到事务被回滚,但是消息仍然从 ActiveMQ 队列中出队(在六次重新传递尝试之后)。

任何指点将不胜感激。

申请代码如下:

 @SpringBootApplication
public class SpringJmsDemoApplication {
public static void main(String[] args) {
        SpringApplication.run(SpringJmsDemoApplication.class, args);
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
        defaultJmsListenerContainerFactory.setTransactionManager(jmsTransactionManager(connectionFactory));
        defaultJmsListenerContainerFactory.setSessionTransacted(true);
        defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        configurer.configure(defaultJmsListenerContainerFactory, connectionFactory);
        return defaultJmsListenerContainerFactory;
    }

    @Bean
    public PlatformTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}

@Component

public class 接收器 {

@JmsListener(destination = "mailbox", containerFactory = "myFactory")
@Transactional
public void receiveMessage(String email) {
    System.out.println("Received <" + email + ">");
    throw new RuntimeException("nooo");
}

}

这是日志:

2017-05-24 09:51:59.865 DEBUG 8972 --- [DefaultMessageListenerContainer-1] o.s.j.connection.JmsTransactionManager   : Created JMS transaction on Session [ActiveMQSession {id=ID:D6C0B8467A518-58248-1495590693980-1:32:1,started=false} java.lang.Object@65d647cd] from Connection [ActiveMQConnection 
2017-05-24 09:51:59.867 DEBUG 8972 --- [DefaultMessageListenerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] from consumer [ActiveMQMessageConsumer { value=ID:D6C0B8467A518-58248-1495590693980-1:32:1:1, started=true }] of transactional session [ActiveMQSession {id=ID:D6C0B8467A518-58248-1495590693980-1:32:1,started=true} java.lang.Object@65d647cd]
2017-05-24 09:51:59.867 DEBUG 8972 --- [DefaultMessageListenerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Rolling back transaction because of listener exception thrown: org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method 'public void com.anz.markets.springjmsdemo.Receiver.receiveMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: nooo
2017-05-24 09:51:59.867  WARN 8972 --- [DefaultMessageListenerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.

org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method 'public void com.anz.markets.springjmsdemo.Receiver.receiveMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: nooo
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112) ~[spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:69) ~[spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:721) ~[spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:681) ~[spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:651) ~[spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317) [spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:235) [spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166) [spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158) [spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055) [spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72]
Caused by: java.lang.RuntimeException: nooo
    at com.anz.markets.springjmsdemo.Receiver.receiveMessage(Receiver.java:12) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_72]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_72]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_72]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_72]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180) ~[spring-messaging-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112) ~[spring-messaging-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:104) ~[spring-jms-4.3.8.RELEASE.jar:4.3.8.RELEASE]
    ... 10 common frames omitted

2017-05-24 09:51:59.868 DEBUG 8972 --- [DefaultMessageListenerContainer-1] o.s.j.connection.JmsTransactionManager   : Transactional code has requested rollback
2017-05-24 09:51:59.868 DEBUG 8972 --- [DefaultMessageListenerContainer-1] o.s.j.connection.JmsTransactionManager   : Initiating transaction rollback
2017-05-24 09:51:59.868 DEBUG 8972 --- [DefaultMessageListenerContainer-1] o.s.j.connection.JmsTransactionManager   : Rolling back JMS transaction on Session [ActiveMQSession {id=ID:D6C0B8467A518-58248-1495590693980-1:32:1,started=true} java.lang.Object@65d647cd]

根据ActiveMQ消息重投文档,投递失败的消息会进入死信队列(http://activemq.apache.org/message-redelivery-and-dlq-handling.html):

"The default Dead Letter Queue in ActiveMQ is called ActiveMQ.DLQ; all un-deliverable messages will get sent to this queue and this can be difficult to manage. So, you can set an individualDeadLetterStrategy in the destination policy map of the activemq.xml configuration file, which allows you to specify a specific dead letter queue prefix for a given queue or topic. You can apply this strategy using wild card if you like so that all queues get their own dead-letter queue, as is shown in the example below"

请将 activemq.xmlindividualDeadLetterStrategy 加到队列中。