使用 Jdbc XA 和 Jms 的 JtaTransaction
JtaTransaction with Jdbc XA and Jms
所以,我有一个 spring 引导项目,我需要在其中执行此操作:
- 开始交易
-A JDBC 读取状态为 TO_SEND 的行的轮询器,
- 为每一行发送一个 Jms,
-更新状态'SENT'
-提交事务或失败回滚
服务器是 Weblogic,具有处理行的 XA 数据源、Jms 的 XA 工厂、jndi 上下文和 spring 集成轮询器 (jdbcpollingchaneladapter) 和 jta 事务:
如在此 doc 中所见,为此,我必须将 JtaTransaction 与 userTransaction 一起使用,并创建一个非事务性 Jms 会话
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
StgOutJms stg = new StgOutJms();
j.setRowMapper(stg);
return j;
}
//Poller metadata with jta Transaction
@Bean
public PollerMetadata pollerMetadata() throws NamingException {
return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
.transactional(transactionManager).get();
}
使用 userTransaction 的 Jta 事务管理器:
@Bean
public PlatformTransactionManager transactionManager() throws NamingException {
Hashtable<String, String> properties = new Hashtable<>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");
return new JtaTransactionManager(xact);
}
进程:
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.
consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
@Override
public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
logger.info("JMS: Consume messages");
for (StgOutJms stgOutEntity : stgEntityList) {
if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
sendMessage(stgOutEntity);
stgOutEntity.setStatus("SENT");
repositoryOut.save(stgOutEntity);
} else {
logger.error("The id of the object received is null");
}
}
}
Jms 连接:
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
vQueueSender = vQueueSession.createSender(vQueue);
}
此代码的问题在于 Jms 消息是在事务中发送的(成功时提交,失败时回滚)但发送的状态永远不会更新(crudrepository)。
此外,我尝试使用 jpaTransactionManager,它对数据库保存很有用,但是 Jms 消息是在事务提交之前发送的(失败时没有 jms 回滚)。
非常感谢您的帮助!
我不熟悉Weblogic,但在WebSphere 中我将DataSource 配置为XA,将JMS 连接工厂配置为XA。并使用 JNDI 的 XA 事务管理器将类似于您的调用包装到一个全局 JTA 事务中。
我也可以建议查看这篇文章如何避免 XA 事务:https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html
经过几天的研究,我找到的唯一解决方案是将事务管理器传递给 jdbc 轮询器,并使用 JmsTemplate 和 session transacted = true,这样 jms 的 commit/rollback 和jdbc 是在同一笔交易中完成的。
// JMS Beans
@Bean
public JndiTemplate jndiTemplate() {
final Properties jndiProps = new Properties();
jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
jndiProps.setProperty(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
JndiTemplate jndiTemplate = new JndiTemplate();
jndiTemplate.setEnvironment(jndiProps);
return jndiTemplate;
}
@Bean
public JndiObjectFactoryBean queueConnectionFactory() {
JndiObjectFactoryBean queueConnectionFactory = new JndiObjectFactoryBean();
queueConnectionFactory.setJndiTemplate(jndiTemplate());
queueConnectionFactory.setJndiName(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
return queueConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) queueConnectionFactory().getObject());
jmsTemplate.setReceiveTimeout(500);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public JndiObjectFactoryBean jmsQueueOut() {
JndiObjectFactoryBean jmsQueue = new JndiObjectFactoryBean();
jmsQueue.setJndiTemplate(jndiTemplate());
jmsQueue.setJndiName(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
return jmsQueue;
}
通过事务管理器使用轮询器
@Bean
public PollerMetadata pollerMetadata() {
return Pollers
.fixedDelay(Long.valueOf(env.getProperty("poller.interval"))).transactional(transactionManager)
.get();
}
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter poller = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
// RowMapper for mapping the list returned to the entity StgOutJms
poller.setRowMapper(new StgOutJms());
poller.setMaxRowsPerPoll(10);
return poller;
}
还有一个服务激活器
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.consumeMessage((Destination) jmsQueueOut().getObject(),
(List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
希望对您有所帮助。
所以,我有一个 spring 引导项目,我需要在其中执行此操作:
- 开始交易 -A JDBC 读取状态为 TO_SEND 的行的轮询器, - 为每一行发送一个 Jms, -更新状态'SENT' -提交事务或失败回滚
服务器是 Weblogic,具有处理行的 XA 数据源、Jms 的 XA 工厂、jndi 上下文和 spring 集成轮询器 (jdbcpollingchaneladapter) 和 jta 事务:
如在此 doc 中所见,为此,我必须将 JtaTransaction 与 userTransaction 一起使用,并创建一个非事务性 Jms 会话
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
StgOutJms stg = new StgOutJms();
j.setRowMapper(stg);
return j;
}
//Poller metadata with jta Transaction
@Bean
public PollerMetadata pollerMetadata() throws NamingException {
return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
.transactional(transactionManager).get();
}
使用 userTransaction 的 Jta 事务管理器:
@Bean
public PlatformTransactionManager transactionManager() throws NamingException {
Hashtable<String, String> properties = new Hashtable<>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");
return new JtaTransactionManager(xact);
}
进程:
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.
consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
@Override
public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
logger.info("JMS: Consume messages");
for (StgOutJms stgOutEntity : stgEntityList) {
if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
sendMessage(stgOutEntity);
stgOutEntity.setStatus("SENT");
repositoryOut.save(stgOutEntity);
} else {
logger.error("The id of the object received is null");
}
}
}
Jms 连接:
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
vQueueSender = vQueueSession.createSender(vQueue);
}
此代码的问题在于 Jms 消息是在事务中发送的(成功时提交,失败时回滚)但发送的状态永远不会更新(crudrepository)。
此外,我尝试使用 jpaTransactionManager,它对数据库保存很有用,但是 Jms 消息是在事务提交之前发送的(失败时没有 jms 回滚)。
非常感谢您的帮助!
我不熟悉Weblogic,但在WebSphere 中我将DataSource 配置为XA,将JMS 连接工厂配置为XA。并使用 JNDI 的 XA 事务管理器将类似于您的调用包装到一个全局 JTA 事务中。
我也可以建议查看这篇文章如何避免 XA 事务:https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html
经过几天的研究,我找到的唯一解决方案是将事务管理器传递给 jdbc 轮询器,并使用 JmsTemplate 和 session transacted = true,这样 jms 的 commit/rollback 和jdbc 是在同一笔交易中完成的。
// JMS Beans
@Bean
public JndiTemplate jndiTemplate() {
final Properties jndiProps = new Properties();
jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
jndiProps.setProperty(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
JndiTemplate jndiTemplate = new JndiTemplate();
jndiTemplate.setEnvironment(jndiProps);
return jndiTemplate;
}
@Bean
public JndiObjectFactoryBean queueConnectionFactory() {
JndiObjectFactoryBean queueConnectionFactory = new JndiObjectFactoryBean();
queueConnectionFactory.setJndiTemplate(jndiTemplate());
queueConnectionFactory.setJndiName(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
return queueConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) queueConnectionFactory().getObject());
jmsTemplate.setReceiveTimeout(500);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public JndiObjectFactoryBean jmsQueueOut() {
JndiObjectFactoryBean jmsQueue = new JndiObjectFactoryBean();
jmsQueue.setJndiTemplate(jndiTemplate());
jmsQueue.setJndiName(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
return jmsQueue;
}
通过事务管理器使用轮询器
@Bean
public PollerMetadata pollerMetadata() {
return Pollers
.fixedDelay(Long.valueOf(env.getProperty("poller.interval"))).transactional(transactionManager)
.get();
}
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter poller = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
// RowMapper for mapping the list returned to the entity StgOutJms
poller.setRowMapper(new StgOutJms());
poller.setMaxRowsPerPoll(10);
return poller;
}
还有一个服务激活器
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.consumeMessage((Destination) jmsQueueOut().getObject(),
(List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
希望对您有所帮助。