使用 Jdbc XA 和 Jms 的 JtaTransaction

JtaTransaction with Jdbc XA and Jms

所以,我有一个 spring 引导项目,我需要在其中执行此操作:

服务器是 Weblogic,具有处理行的 XA 数据源、Jms 的 XA 工厂、jndi 上下文和 spring 集成轮询器 (jdbcpollingchaneladapter) 和 jta 事务:

如在此 doc 中所见,为此,我必须将 JtaTransaction 与 userTransaction 一起使用,并创建一个非事务性 Jms 会话

   // DATABASE Poller using JdbcPollingChannelAdapter
    @Bean
    @InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
    public MessageSource<?> jpaInbound() {
        // Select request by status = 'TO_SEND'
        JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
                StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
        StgOutJms stg = new StgOutJms();
        j.setRowMapper(stg);
        return j;
    }

     //Poller metadata with jta Transaction
     @Bean
        public PollerMetadata pollerMetadata() throws  NamingException   {
            return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
                    .transactional(transactionManager).get();
        }

使用 userTransaction 的 Jta 事务管理器:

    @Bean
    public PlatformTransactionManager transactionManager() throws NamingException {
        Hashtable<String, String> properties = new Hashtable<>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
        InitialContext vInitialContext = new InitialContext(properties);    
        UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");       
        return new JtaTransactionManager(xact);
    }

进程:

// Service Activator : Lunching the Jms Creation for each row
    @Bean
    @ServiceActivator(inputChannel = "jpaInputChannel")
    public MessageHandler handler() {

        return wlstoreMessage -> {
            try {
                                 
                jmsSenderService.
                consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
            } catch (NamingException | JMSException e) {
                log.error(e.getMessage(), e);
            }

        };
    }

    @Override
    public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
            logger.info("JMS: Consume messages");
    
            for (StgOutJms stgOutEntity : stgEntityList) {
                if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
                  
                    sendMessage(stgOutEntity);
                    stgOutEntity.setStatus("SENT");
                    repositoryOut.save(stgOutEntity);
                } else {
                    logger.error("The id  of the object received is null");
                }
            }
        }

Jms 连接:

    @Override
    public void initQueueConnection() throws NamingException, JMSException {

        Hashtable<String, String> properties = new Hashtable<String, String>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
        InitialContext vInitialContext = new InitialContext(properties);

        QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
                .lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));

        vQueueConnection = vQueueConnectionFactory.createQueueConnection();
        vQueueConnection.start();

        vQueueSession = vQueueConnection.createQueueSession(false, 0);

        Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));

        vQueueSender = vQueueSession.createSender(vQueue);
    }

此代码的问题在于 Jms 消息是在事务中发送的(成功时提交,失败时回滚)但发送的状态永远不会更新(crudrepository)。

此外,我尝试使用 jpaTransactionManager,它对数据库保存很有用,但是 Jms 消息是在事务提交之前发送的(失败时没有 jms 回滚)。

非常感谢您的帮助!

我不熟悉Weblogic,但在WebSphere 中我将DataSource 配置为XA,将JMS 连接工厂配置为XA。并使用 JNDI 的 XA 事务管理器将类似于您的调用包装到一个全局 JTA 事务中。

我也可以建议查看这篇文章如何避免 XA 事务:https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html

经过几天的研究,我找到的唯一解决方案是将事务管理器传递给 jdbc 轮询器,并使用 JmsTemplate 和 session transacted = true,这样 jms 的 commit/rollback 和jdbc 是在同一笔交易中完成的。

  // JMS Beans
    @Bean
    public JndiTemplate jndiTemplate() {

        final Properties jndiProps = new Properties();
        jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        jndiProps.setProperty(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));

        JndiTemplate jndiTemplate = new JndiTemplate();
        jndiTemplate.setEnvironment(jndiProps);

        return jndiTemplate;
    }

    @Bean
    public JndiObjectFactoryBean queueConnectionFactory() {
        JndiObjectFactoryBean queueConnectionFactory = new JndiObjectFactoryBean();
        queueConnectionFactory.setJndiTemplate(jndiTemplate());
        queueConnectionFactory.setJndiName(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));

        return queueConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) queueConnectionFactory().getObject());
        jmsTemplate.setReceiveTimeout(500);
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }

 @Bean
    public JndiObjectFactoryBean jmsQueueOut() {
        JndiObjectFactoryBean jmsQueue = new JndiObjectFactoryBean();
        jmsQueue.setJndiTemplate(jndiTemplate());
        jmsQueue.setJndiName(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));

        return jmsQueue;
    }

通过事务管理器使用轮询器

@Bean
    public PollerMetadata pollerMetadata() {
        return Pollers
                .fixedDelay(Long.valueOf(env.getProperty("poller.interval"))).transactional(transactionManager)
                .get();
    }

    // DATABASE Poller using JdbcPollingChannelAdapter
    @Bean
    @InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
    public MessageSource<?> jpaInbound() {

        // Select request by status = 'TO_SEND'
        JdbcPollingChannelAdapter poller = new JdbcPollingChannelAdapter(datasource,
                StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
        // RowMapper for mapping the list returned to the entity StgOutJms
        poller.setRowMapper(new StgOutJms());
        poller.setMaxRowsPerPoll(10);
        return poller;
    }

还有一个服务激活器

 // Service Activator : Lunching the Jms Creation for each row
    @Bean
    @ServiceActivator(inputChannel = "jpaInputChannel")
    public MessageHandler handler() {

        return wlstoreMessage -> {
            try {

                jmsSenderService.consumeMessage((Destination) jmsQueueOut().getObject(),
                        (List<StgOutJms>) wlstoreMessage.getPayload());
            } catch (NamingException | JMSException e) {
                log.error(e.getMessage(), e);
            }

        };
    }

希望对您有所帮助。