使用共享的 JmsTransactionManager 和 spring-boot to read/write 在没有 XA 的同一个代理上的消息
Using a shared JmsTransactionManager with spring-boot to read/write messages on same broker without XA
我有一个 spring-boot 服务,可以读取和写入同一个 IBM MQ 消息代理。该进程是独立的,不在应用程序容器内 运行。我想实现“共享事务资源”模式,这样我就不需要配置任何 JTA/XA 事务管理。我有一条快乐的道路,但是以下边缘情况不会回滚消息发布。读取已回滚,但发布仍在提交。
给定 MessageListener 收到一条消息
并且使用相同的JMS ConnectionFactory
将消息发布到另一个队列
当消息发布后onMessage()抛出异常
然后消息回滚到READ队列,不发布到WRITE队列
我的代码是这样的...
@Component
public class MyJmsReceiver implements MessageListener
{
@Autowired MyJmsSender myJmsSender;
@Override
public void onMessage(Message message)
{
myJmsSender.sendMessage("some-payload");
if(true) throw new RuntimeException("BOOM!");
}
}
@Component
public class MyJmsSender
{
@Transactional(propagation = Propagation.MANDATORY)
public void sendMessage(final String payload)
{
jmsTemplate.convertAndSend("QUEUE.OUT", payload);
}
}
@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory)
{
// using a SingleConnectionFactory gives us one reusable connection rather than opening a new one for each message published
JmsTemplate jmsTemplate = new JmsTemplate(new SingleConnectionFactory(connectionFactory));
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
ConnectionFactory connectionFactory,
PlatformTransactionManager transactionManager,
MyJmsReceiver myJmsReceiver)
{
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory);
dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
dmlc.setSessionTransacted(true);
dmlc.setTransactionManager(transactionManager);
dmlc.setConcurrency(concurrency);
dmlc.setDestinationName("QUEUE.IN");
dmlc.setMessageListener(myJmsReceiver);
return dmlc;
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean
public ConnectionFactory connectionFactory(
@Value("${jms.host}") String host,
@Value("${jms.port}") int port,
@Value("${jms.queue.manager}") String queueManager,
@Value("${jms.channel}") String channel
) throws JMSException
{
MQConnectionFactory ibmMq = new MQConnectionFactory();
ibmMq.setHostName(host);
ibmMq.setPort(port);
ibmMq.setQueueManager(queueManager);
ibmMq.setTransportType(WMQConstants.WMQ_CM_CLIENT);
ibmMq.setChannel(channel);
return ibmMq;
}
}
当我启用 JmsTransactionManager 的日志记录时,我看到发布是“参与现有事务”,没有创建新的 txn,并且 DMLC 已回滚事务。但是,我仍然看到消息已发布,而已读消息被放回队列中。
2020-09-07_13:21:33.000 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_REQUIRED,ISOLATION
_DEFAULT
2020-09-07_13:21:33.015 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89] from Connection [com.ibm.mq.jms.MQQueueConnection@bd527da]
2020-09-07_13:21:33.034 [defaultMessageListenerContainer-1] INFO c.l.c.c.r.MyJmsReceiver - "Read message from QUEUE.IN for messageId ID:414d51204c43482e434c4b2e545354205f49ea352c992702
2020-09-07_13:21:33.054 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Participating in existing transaction
2020-09-07_13:21:33.056 [defaultMessageListenerContainer-1] INFO c.l.c.c.p.r.MyJmsSender - Sending message to queue: QUEUE.OUT
2020-09-07_13:21:33.077 [defaultMessageListenerContainer-1] ERROR c.l.c.c.r.MyJmsReceiver - Failed to process messageId: ID:414d51204c43482e434c4b2e545354205f49ea352c992702 with RuntimeException: BOOM!
2020-09-07_13:21:33.096 [defaultMessageListenerContainer-1] WARN o.s.j.l.DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
com.xxx.receive.MessageListenerException: java.lang.RuntimeException: BOOM!
at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:83)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: BOOM!
at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:74)
... 9 common frames omitted
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Transactional code has requested rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Initiating transaction rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Rolling back JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89]
2020-09-07_13:21:33.107 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2020-09-07_13:21:33.123 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@8d93093] from Connection [com.ibm.mq.jms.MQQueueConnection@610b3b42]
有没有办法在不实现像 Atomikos 这样的正式 XA 库的情况下使它工作?
我的理解是 ChainedTransactionManager 也无法解决我的问题,因为一旦内部事务被提交(即发布),外部事务就无法回滚该提交。
消息的发布实际上是 onMessage() 执行的最后一件事。
在 JmsTemplate
中定义 SingleConnectionFactory
是问题所在。您将获得一个新连接,因此在发件人中获得一个新会话,这使得无法重用来自侦听器的 运行 事务。
使用 CachingDestinationResolver 代替 SingleConnectionFactory
以提高性能:
@Bean
public CachingDestinationResolver cachingDestinationResolver()
{
JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
destinationResolver.setFallbackToDynamicDestination(true);
return destinationResolver;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory,
CachingDestinationResolver destinationResolver)
{
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setDestinationResolver(destinationResolver);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
ConnectionFactory connectionFactory,
PlatformTransactionManager transactionManager,
MyJmsReceiver myJmsReceiver,
CachingDestinationResolver destinationResolver)
{
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory);
dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
dmlc.setSessionTransacted(true);
dmlc.setTransactionManager(transactionManager);
dmlc.setConcurrency(concurrency);
dmlc.setDestinationName("MY.QUEUE.IN");
dmlc.setDestinationResolver(destinationResolver);
dmlc.setMessageListener(myJmsReceiver);
return dmlc;
}
我有一个 spring-boot 服务,可以读取和写入同一个 IBM MQ 消息代理。该进程是独立的,不在应用程序容器内 运行。我想实现“共享事务资源”模式,这样我就不需要配置任何 JTA/XA 事务管理。我有一条快乐的道路,但是以下边缘情况不会回滚消息发布。读取已回滚,但发布仍在提交。
给定 MessageListener 收到一条消息
并且使用相同的JMS ConnectionFactory
将消息发布到另一个队列当消息发布后onMessage()抛出异常
然后消息回滚到READ队列,不发布到WRITE队列
我的代码是这样的...
@Component
public class MyJmsReceiver implements MessageListener
{
@Autowired MyJmsSender myJmsSender;
@Override
public void onMessage(Message message)
{
myJmsSender.sendMessage("some-payload");
if(true) throw new RuntimeException("BOOM!");
}
}
@Component
public class MyJmsSender
{
@Transactional(propagation = Propagation.MANDATORY)
public void sendMessage(final String payload)
{
jmsTemplate.convertAndSend("QUEUE.OUT", payload);
}
}
@Configuration
@EnableJms
@EnableTransactionManagement
public class Config
{
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory)
{
// using a SingleConnectionFactory gives us one reusable connection rather than opening a new one for each message published
JmsTemplate jmsTemplate = new JmsTemplate(new SingleConnectionFactory(connectionFactory));
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
ConnectionFactory connectionFactory,
PlatformTransactionManager transactionManager,
MyJmsReceiver myJmsReceiver)
{
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory);
dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
dmlc.setSessionTransacted(true);
dmlc.setTransactionManager(transactionManager);
dmlc.setConcurrency(concurrency);
dmlc.setDestinationName("QUEUE.IN");
dmlc.setMessageListener(myJmsReceiver);
return dmlc;
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean
public ConnectionFactory connectionFactory(
@Value("${jms.host}") String host,
@Value("${jms.port}") int port,
@Value("${jms.queue.manager}") String queueManager,
@Value("${jms.channel}") String channel
) throws JMSException
{
MQConnectionFactory ibmMq = new MQConnectionFactory();
ibmMq.setHostName(host);
ibmMq.setPort(port);
ibmMq.setQueueManager(queueManager);
ibmMq.setTransportType(WMQConstants.WMQ_CM_CLIENT);
ibmMq.setChannel(channel);
return ibmMq;
}
}
当我启用 JmsTransactionManager 的日志记录时,我看到发布是“参与现有事务”,没有创建新的 txn,并且 DMLC 已回滚事务。但是,我仍然看到消息已发布,而已读消息被放回队列中。
2020-09-07_13:21:33.000 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_REQUIRED,ISOLATION
_DEFAULT
2020-09-07_13:21:33.015 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89] from Connection [com.ibm.mq.jms.MQQueueConnection@bd527da]
2020-09-07_13:21:33.034 [defaultMessageListenerContainer-1] INFO c.l.c.c.r.MyJmsReceiver - "Read message from QUEUE.IN for messageId ID:414d51204c43482e434c4b2e545354205f49ea352c992702
2020-09-07_13:21:33.054 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Participating in existing transaction
2020-09-07_13:21:33.056 [defaultMessageListenerContainer-1] INFO c.l.c.c.p.r.MyJmsSender - Sending message to queue: QUEUE.OUT
2020-09-07_13:21:33.077 [defaultMessageListenerContainer-1] ERROR c.l.c.c.r.MyJmsReceiver - Failed to process messageId: ID:414d51204c43482e434c4b2e545354205f49ea352c992702 with RuntimeException: BOOM!
2020-09-07_13:21:33.096 [defaultMessageListenerContainer-1] WARN o.s.j.l.DefaultMessageListenerContainer - Execution of JMS message listener failed, and no ErrorHandler has been set.
com.xxx.receive.MessageListenerException: java.lang.RuntimeException: BOOM!
at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:83)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: BOOM!
at com.xxx.MyJmsReceiver.onMessage(MyJmsReceiver.java:74)
... 9 common frames omitted
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Transactional code has requested rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Initiating transaction rollback
2020-09-07_13:21:33.097 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Rolling back JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@6934ab89]
2020-09-07_13:21:33.107 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Creating new transaction with name [defaultMessageListenerContainer]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2020-09-07_13:21:33.123 [defaultMessageListenerContainer-1] DEBUG o.s.j.c.JmsTransactionManager - Created JMS transaction on Session [com.ibm.mq.jms.MQQueueSession@8d93093] from Connection [com.ibm.mq.jms.MQQueueConnection@610b3b42]
有没有办法在不实现像 Atomikos 这样的正式 XA 库的情况下使它工作?
我的理解是 ChainedTransactionManager 也无法解决我的问题,因为一旦内部事务被提交(即发布),外部事务就无法回滚该提交。
消息的发布实际上是 onMessage() 执行的最后一件事。
在 JmsTemplate
中定义 SingleConnectionFactory
是问题所在。您将获得一个新连接,因此在发件人中获得一个新会话,这使得无法重用来自侦听器的 运行 事务。
使用 CachingDestinationResolver 代替 SingleConnectionFactory
以提高性能:
@Bean
public CachingDestinationResolver cachingDestinationResolver()
{
JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
destinationResolver.setFallbackToDynamicDestination(true);
return destinationResolver;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory,
CachingDestinationResolver destinationResolver)
{
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setDestinationResolver(destinationResolver);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
ConnectionFactory connectionFactory,
PlatformTransactionManager transactionManager,
MyJmsReceiver myJmsReceiver,
CachingDestinationResolver destinationResolver)
{
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory);
dmlc.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
dmlc.setSessionTransacted(true);
dmlc.setTransactionManager(transactionManager);
dmlc.setConcurrency(concurrency);
dmlc.setDestinationName("MY.QUEUE.IN");
dmlc.setDestinationResolver(destinationResolver);
dmlc.setMessageListener(myJmsReceiver);
return dmlc;
}