Spring 集成的 DefaultMessageListenerContainer 和 JPA 中的事务
Transactions in Spring Integration's DefaultMessageListenerContainer and JPA
假设我有一个简单的 IntegrationFlow
,它以异步方式读取 JMS 消息、转换、应用业务逻辑等。
@Bean
public IntegrationFlow upstreamEventFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.configureListenerContainer(container -> container.destinationResolver(destinationResolver))
.destination("myQueue")
.get()
)
.transform(xmlToObjectTransformer)
.transform(upstreamTransformer)
.handle(evaluationHandler)
.transform(objectToXmlTransformer)
.channel(downstreamEventChannel)
.get();
}
evaluationHandler 是 GenericHandler
,它使用 JPA 从数据库中获取数据并应用一些业务逻辑
我的问题与 Spring 和 Spring 集成中的交易有关
当我使用以下记录器启用记录时:
<logger name="org.springframework.transaction" level="trace"/>
<logger name="org.springframework.jms.connection.JmsTransactionManager" level="debug"/>
<logger name="org.springframework.orm.jpa.JpaTransactionManager" level="debug"/>
<logger name="org.springframework.orm.hibernate5.HibernateTransactionManager" level="debug"/>
<logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="debug"/>
<logger name="org.springframework.integration" level="debug"/>
<logger name="org.springframework.messaging" level="debug"/>
<logger name="org.springframework.jms" level="debug"/>
我可以看到这一切都是从 JPA 事务开始的,而不是 Spring 集成创建的事务,或者至少是 JMS
来自日志:
o.s.orm.jpa.JpaTransactionManager : Creating new transaction with name ...: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
o.s.orm.jpa.JpaTransactionManager : Opened new EntityManager ... for JPA transaction
o.s.orm.jpa.JpaTransactionManager : Exposing JPA transaction as JDBC transaction ...
这是为什么?
您能否详细说明事务在 DefaultMessageListenerContainer
中的工作原理以及它如何与 JMS 事务、JMS 事务处理会话、JPA 事务等相关联?
事务边界在哪里?
事务会话和在DefaultMessageListenerContainer
中使用事务管理器有什么区别?
如果我在处理程序中抛出 RuntimeException
,我可以看到它被回滚并看到消息 Execution of JMS message listener failed, and no ErrorHandler has been set.
并且 activemq 代理多次重新传送消息
我应该设置ErrorHandler
吗?
最好的猜测是你的一颗豆子,例如evaluationHandler
或方法,注释为 @Transactional
。那是 JPA 事务开始的时候。当该方法退出时,事务将提交(或回滚)。
由于默认情况下处理 MD 通道适配器,因此 JMS(本地)事务包装整个流程并且在整个流程完成(或中止)之前不会提交(或回滚)。
因此,这两个事务在任何方面都没有真正同步;他们是独立的。
如果将 JPA 事务管理器注入适配器的侦听器容器,框架将提供 "Best Efforts 1PC" 事务同步 - 请参阅 Dave Syer's Javaworld Article "Distributed transactions in Spring, with and without XA"。
JPA 事务将由容器启动,两个事务将背靠背提交(数据库优先),有效地同步它们。但这不是真正的 JTA,并且在 JMS Tx 回滚时 DB Tx 提交的可能性很小 - 因此您必须处理重复交付的可能性。
假设我有一个简单的 IntegrationFlow
,它以异步方式读取 JMS 消息、转换、应用业务逻辑等。
@Bean
public IntegrationFlow upstreamEventFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.configureListenerContainer(container -> container.destinationResolver(destinationResolver))
.destination("myQueue")
.get()
)
.transform(xmlToObjectTransformer)
.transform(upstreamTransformer)
.handle(evaluationHandler)
.transform(objectToXmlTransformer)
.channel(downstreamEventChannel)
.get();
}
evaluationHandler 是 GenericHandler
,它使用 JPA 从数据库中获取数据并应用一些业务逻辑
我的问题与 Spring 和 Spring 集成中的交易有关
当我使用以下记录器启用记录时:
<logger name="org.springframework.transaction" level="trace"/>
<logger name="org.springframework.jms.connection.JmsTransactionManager" level="debug"/>
<logger name="org.springframework.orm.jpa.JpaTransactionManager" level="debug"/>
<logger name="org.springframework.orm.hibernate5.HibernateTransactionManager" level="debug"/>
<logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="debug"/>
<logger name="org.springframework.integration" level="debug"/>
<logger name="org.springframework.messaging" level="debug"/>
<logger name="org.springframework.jms" level="debug"/>
我可以看到这一切都是从 JPA 事务开始的,而不是 Spring 集成创建的事务,或者至少是 JMS
来自日志:
o.s.orm.jpa.JpaTransactionManager : Creating new transaction with name ...: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
o.s.orm.jpa.JpaTransactionManager : Opened new EntityManager ... for JPA transaction
o.s.orm.jpa.JpaTransactionManager : Exposing JPA transaction as JDBC transaction ...
这是为什么?
您能否详细说明事务在 DefaultMessageListenerContainer
中的工作原理以及它如何与 JMS 事务、JMS 事务处理会话、JPA 事务等相关联?
事务边界在哪里?
事务会话和在DefaultMessageListenerContainer
中使用事务管理器有什么区别?
如果我在处理程序中抛出 RuntimeException
,我可以看到它被回滚并看到消息 Execution of JMS message listener failed, and no ErrorHandler has been set.
并且 activemq 代理多次重新传送消息
我应该设置ErrorHandler
吗?
最好的猜测是你的一颗豆子,例如evaluationHandler
或方法,注释为 @Transactional
。那是 JPA 事务开始的时候。当该方法退出时,事务将提交(或回滚)。
由于默认情况下处理 MD 通道适配器,因此 JMS(本地)事务包装整个流程并且在整个流程完成(或中止)之前不会提交(或回滚)。
因此,这两个事务在任何方面都没有真正同步;他们是独立的。
如果将 JPA 事务管理器注入适配器的侦听器容器,框架将提供 "Best Efforts 1PC" 事务同步 - 请参阅 Dave Syer's Javaworld Article "Distributed transactions in Spring, with and without XA"。
JPA 事务将由容器启动,两个事务将背靠背提交(数据库优先),有效地同步它们。但这不是真正的 JTA,并且在 JMS Tx 回滚时 DB Tx 提交的可能性很小 - 因此您必须处理重复交付的可能性。