Spring 使用 TransactionManager(JMS、数据库)的多个事务

Spring multiple transactions with TransactionManager (JMS, database)

我有一个方法可以执行两个事务:一个使用 DB,一个使用 JMS。我希望一个接一个地做出承诺。我正在尝试为此使用 PlatformTransactionManager 。有两种方法:使用 TransactionTemplateDefaultTransactionDefinition。但是我没有找到一个多次使用的例子。我想做的是:

void do(){
 T dbTransaction = ...; // here goes: new TransactionTemplate(transactionManager) two times
 T jmsTransaction = ...; // or: new DefaultTransactionDefinition() and transactionManager.getTransaction(definition); two times
 saveDb();
 sendJms();
 dbTransaction.commit();
 jmsTransaction.commit();
}

但我不确定使用什么以及如何使用,因为在 this article 中它说:

Anyway, once we create a TransactionTemplate with a configuration, all transactions will use that configuration to execute. So, if we need multiple configurations, we should create multiple template instances.

那么如何正确地创建两个事务并一个接一个地关闭呢?我应该创建两个单独的 definitions 还是可以重复使用一个?我可以在两个 templates 中重复使用相同的 transactionManager 吗?我知道数据库有一个 @Transcational 注释,我也可以将 JMS 配置为使用事务处理,但是:

  1. 我没有找到如何配置 JMS 以使用事务的好示例
  2. 我不确定他们会关闭哪个订单

所以我想手动执行此操作。我也不确定此手动事务是否适用于 JMS(例如 IBM-MQ),因为我只看到了数据库事务的示例。

目前还不清楚为什么您希望在这种特定情况下使用 JMS 事务,我什至反对它 - 至少正如您在上面介绍的那样。

您基本上想要在状态成功存储到数据库后发布一条消息。

既然您的真实来源是数据库,为什么不将所有后续操作都基于该操作的成功完成呢?

例如,构建它的一种方法类似于(Spring-oriented,因为您已经提到您正在使用它):

  1. 创建一个范围为当前事务的 JmsSender bean。这可以通过实施 BeanFactoryPostProcessor 并执行类似以下操作来完成:
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SimpleTransactionScope transactionScope = new SimpleTransactionScope();
        // The scope exists, but is not registered by Spring by default.
        beanFactory.registerScope("transaction", transactionScope);
    }

    // in a separate configuration class defining your JmsSender bean
    @Bean
    @Scope("transaction")
    public JmsSender jmsSender() { return new JmsSender(); }
  1. 每次调用此 bean 的 send() 方法时,都会向内部队列添加一条消息。这通常是 ThreadLocal<List<T>> - 事实上,Spring 以几乎相同的方式处理事务管理。
  2. 创建一个 AfterCommitJmsPublisher bean,它是一个 TransactionSynchronizationAdapter - 这意味着我们希望在提交时有额外的行为。
  3. 注册 AfterCommitJmsPublisher。这意味着在交易之前调用 TransactionSynchronizationManager.registerSynchronization(jmsPublisher)。一种方法是使用例如方面、声明式事务管理 (@Transactional) 和 Spring AOP 将是:
@Aspect
@Component
public class AfterCommitJmsPublisher extends TransactionSynchronizationAdapter {

    private final JmsPublisher;

    @Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)")
    private void transactionalPointcut() {
    }

    @Before("transactionalPointcut()")
    public void registerTransactionSynchronization() {
        TransactionSynchronizationManager.registerSynchronization(this);
    }
  1. 提交数据库事务后,调用 jmsPublisher.publish()。这可以在 TransactionSynchronizationAdapter:
  2. afterCommit() 方法中完成
    // In AfterCommitJmsPublisher
    @Override
    public void afterCommit() {
        jmsPublisher.publish();
    }
  1. 如果事务被回滚,则调用 jmsPublisher.clear()。您可能不想发布任何有关失败操作的消息。

这样,您的 JMS 消息总是绑定到它们源自的事务 - 如果数据库事务失败,则不会发送任何消息。

离开你的评论:

In the manual scenario it will fail if JMS failed, but will save it to DB if there's no exception with JMS, and even if after that transaction with JMS will fail I'm ok with that because I'm saving the state in DB.

这可能足以满足您的要求。但是,您可能需要考虑到您拥有的组件越多,您的系统需要的容错能力就越高,并考虑到可能无法使用的外部服务。

这可能意味着将 JMS 消息保存在一个特殊的数据库中 table(作为事务的一部分!)并且仅在成功提交后发布,在成功发布后删除保存的消息。如果不成功,您可以执行一个管家任务来重新尝试发布您的消息。

最后,关于分布式事务的一句话:我个人建议尽可能不要使用它们,尤其是对于您当前的用例。它们是复杂的野兽,几乎肯定会影响您的应用程序的可用性并增加事务中涉及的所有进程的端到端延迟。 Saga pattern 这样的东西通常更适合分布式系统。

当然,这可能不适用于您的用例,并且您的一致性要求可能超过任何可用性要求,因此请持保留态度。

您的用例简单而常见。您希望发送一条 JMS 消息,等待它完成,然后提交到数据库。这是在两个事务上完成的——一个用于 JMS 消息,另一个用于数据库。这些事务都存在于单个事务上下文中。当您启动 JMS 事务时,将不存在事务上下文,因此将创建一个。当您启动数据库事务时,它会加入现有的事务上下文。这些事务将同步,因为 JMS 事务必须在提交数据库事务之前成功完成。

此操作的核心是事务管理器。查看您链接的文章,他们对 PlatformTransactionManager 进行了多次引用。在您的用例中,PlatformTransactionManager 必须是支持 JTA 的事务管理器。 JTA 事务管理器将能够创建事务上下文并注册和同步事务。

请注意,这是两个本地事务,这绝不是 XA 或分布式事务。在这种情况下,如果 JMS 本地事务失败,那么数据库本地事务将被回滚。更具体地说,事务上下文被标记为仅回滚。如果发生任何未处理的异常,则事务上下文被标记为仅回滚。对本地事务调用 commit() 的任何尝试都将失败,并显示一条消息,指出事务上下文仅回滚。

实现这一点取决于平台。例如,如果您的 Spring 项目部署在 JBoss 等应用程序服务器上的 WAR 文件中,那么 PlatformTransactionManager 将自动自动装配。如果您使用 Spring 引导,那么大多数配置甚至不包括事务管理器。

我有一个用于 Spring 引导 here 的事务性 Spring JMS 和 Camel。这是 IBM MQ 的简单消息桥。如果不出意外,Spring JMS 和注释以及事务性 IBM MQ 的示例应该有用。也许 Camel 钻头也很有用。

请注意 pom.xml 文件包含:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-narayana</artifactId>
    </dependency>

此 Spring 启动器将安装和配置 Arjuna JTA 事务管理器作为 PlatformTransactionManager

在我的示例中,我有:

<logger name="com.arjuna" level="TRACE" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

这为 Arjuna JTA 事务管理器提供了非常好的日志记录。

最重要的是,获取配置为 PlatformTransactionManager 的 JTA 事务管理器。使用它来创建事务上下文,并在该上下文中拥有两个本地同步事务。

示例项目应该很容易得到运行。日志记录非常有用。