Mule 事务范围回滚似乎失败
Mule transactional scope rollback appears to fail
编辑:Mule 3.4.1
我们有一个 Mule 流,它交替从一个数据库读取和插入到另一个数据库,所有这些都包含在一个事务范围内。在这种特殊情况下,其中一个后来的插入失败了,我们希望一切都回滚。
当我们查看日志时,我们看到第二次插入(即下例中的 BulkInsertInstanceToCache)的异常(例如,重复的 PRIMARY KEY)。当我们查看数据库时,我们会看到第一次插入的数据(下例中的 BulkInsertActivityToCache)。我们预计所有数据都会消失。
我们是否为我们想要的行为错误地配置了这个范围?
这是一个代码示例,我将其缩减为仅插入两个内容以显示正在完成的处理类型。
<flow name="ProcessBulkUpdateCache" processingStrategy="synchronous" doc:name="ProcessBulkUpdateCache">
<transactional action="ALWAYS_BEGIN" doc:name="Transactional">
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="GetActivitiesForCache" queryTimeout="-1" connector-ref="SumTotalDatabase">
<jdbc-ee:transaction action="NONE" />
</jdbc-ee:outbound-endpoint>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="BulkInsertActivityToCache" queryTimeout="-1" connector-ref="EAIServiceDatabase">
</jdbc-ee:outbound-endpoint>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="GetInstancesForCache" queryTimeout="-1" connector-ref="SumTotalDatabase">
<jdbc-ee:transaction action="NONE" />
</jdbc-ee:outbound-endpoint>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="BulkInsertInstanceToCache" queryTimeout="-1" connector-ref="EAIServiceDatabase">
</jdbc-ee:outbound-endpoint>
</transactional>
<catch-exception-strategy doc:name="Unexpected">
...etc.
</catch-exception-strategy>
</flow>
Edit 我尝试在第一个 INSERT 中添加一个 BEGIN_OR_JOIN 事务元素,在第二个 INSERT 中添加一个 ALWAYS_JOIN 事务元素,但是代码随后抛出异常当它到达第二个时,没有打开的交易可以加入。
分别使用 ALWAYS_BEGIN
和 ALWAYS_JOIN
是正确的方法。
但是,如果是两个不同的数据库,则需要使用 XA 事务。本地事务无法从两个不同的数据库注册资源。
据我所知,第一个出站应该始终开始,之后每个出站都应该始终开始
<db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="ib_trade" useXaTransactions="true" driverClassName="com.mysql.jdbc.Driver" doc:name="MySQL Configuration"/>
<flow name="transactonmanagerFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/ram" doc:name="HTTP"/>
<logger level="INFO" doc:name="Logger" message="%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"/>
<set-payload value="#['hi']" doc:name="Set Payload"/>
<vm:outbound-endpoint exchange-pattern="request-response" path="temp" connector-ref="VM" doc:name="VM">
<xa-transaction action="ALWAYS_JOIN" timeout="10000"/>
</vm:outbound-endpoint>
<logger level="INFO" doc:name="Logger" message="^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"/>
</flow>
<flow name="transactonmanagerFlow1">
<vm:inbound-endpoint exchange-pattern="request-response" path="temp" connector-ref="VM" doc:name="VM">
<xa-transaction action="ALWAYS_BEGIN" timeout="100000"/>
</vm:inbound-endpoint>
<logger level="INFO" doc:name="Logger" message="**************************************************************************"/>
<db:insert config-ref="MySQL_Configuration" transactionalAction="ALWAYS_JOIN" doc:name="Database">
<db:dynamic-query><![CDATA[INSERT INTO `ib_trade`.`swt_symbol`(`idswt_symbol`,`symbol_name`,`symbol_exchange`,`symbol_id`) VALUES ("5","1","1","1");]]></db:dynamic-query>
</db:insert>
<logger message="^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" level="INFO" doc:name="Logger"/>
<db:insert config-ref="MySQL_Configuration" transactionalAction="ALWAYS_JOIN" doc:name="Copy_of_Database">
<db:dynamic-query><![CDATA[INSERT INTO `ib_trade`.`swt_symbol`(`idswt_symbol`,`symbol_name`,`symbol_exchange`,`symbol_id`) VALUES ("5","temp","1","1");]]></db:dynamic-query>
</db:insert>
编辑:Mule 3.4.1
我们有一个 Mule 流,它交替从一个数据库读取和插入到另一个数据库,所有这些都包含在一个事务范围内。在这种特殊情况下,其中一个后来的插入失败了,我们希望一切都回滚。
当我们查看日志时,我们看到第二次插入(即下例中的 BulkInsertInstanceToCache)的异常(例如,重复的 PRIMARY KEY)。当我们查看数据库时,我们会看到第一次插入的数据(下例中的 BulkInsertActivityToCache)。我们预计所有数据都会消失。
我们是否为我们想要的行为错误地配置了这个范围?
这是一个代码示例,我将其缩减为仅插入两个内容以显示正在完成的处理类型。
<flow name="ProcessBulkUpdateCache" processingStrategy="synchronous" doc:name="ProcessBulkUpdateCache">
<transactional action="ALWAYS_BEGIN" doc:name="Transactional">
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="GetActivitiesForCache" queryTimeout="-1" connector-ref="SumTotalDatabase">
<jdbc-ee:transaction action="NONE" />
</jdbc-ee:outbound-endpoint>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="BulkInsertActivityToCache" queryTimeout="-1" connector-ref="EAIServiceDatabase">
</jdbc-ee:outbound-endpoint>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="GetInstancesForCache" queryTimeout="-1" connector-ref="SumTotalDatabase">
<jdbc-ee:transaction action="NONE" />
</jdbc-ee:outbound-endpoint>
<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
queryKey="BulkInsertInstanceToCache" queryTimeout="-1" connector-ref="EAIServiceDatabase">
</jdbc-ee:outbound-endpoint>
</transactional>
<catch-exception-strategy doc:name="Unexpected">
...etc.
</catch-exception-strategy>
</flow>
Edit 我尝试在第一个 INSERT 中添加一个 BEGIN_OR_JOIN 事务元素,在第二个 INSERT 中添加一个 ALWAYS_JOIN 事务元素,但是代码随后抛出异常当它到达第二个时,没有打开的交易可以加入。
分别使用 ALWAYS_BEGIN
和 ALWAYS_JOIN
是正确的方法。
但是,如果是两个不同的数据库,则需要使用 XA 事务。本地事务无法从两个不同的数据库注册资源。
据我所知,第一个出站应该始终开始,之后每个出站都应该始终开始
<db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="ib_trade" useXaTransactions="true" driverClassName="com.mysql.jdbc.Driver" doc:name="MySQL Configuration"/>
<flow name="transactonmanagerFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/ram" doc:name="HTTP"/>
<logger level="INFO" doc:name="Logger" message="%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"/>
<set-payload value="#['hi']" doc:name="Set Payload"/>
<vm:outbound-endpoint exchange-pattern="request-response" path="temp" connector-ref="VM" doc:name="VM">
<xa-transaction action="ALWAYS_JOIN" timeout="10000"/>
</vm:outbound-endpoint>
<logger level="INFO" doc:name="Logger" message="^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"/>
</flow>
<flow name="transactonmanagerFlow1">
<vm:inbound-endpoint exchange-pattern="request-response" path="temp" connector-ref="VM" doc:name="VM">
<xa-transaction action="ALWAYS_BEGIN" timeout="100000"/>
</vm:inbound-endpoint>
<logger level="INFO" doc:name="Logger" message="**************************************************************************"/>
<db:insert config-ref="MySQL_Configuration" transactionalAction="ALWAYS_JOIN" doc:name="Database">
<db:dynamic-query><![CDATA[INSERT INTO `ib_trade`.`swt_symbol`(`idswt_symbol`,`symbol_name`,`symbol_exchange`,`symbol_id`) VALUES ("5","1","1","1");]]></db:dynamic-query>
</db:insert>
<logger message="^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" level="INFO" doc:name="Logger"/>
<db:insert config-ref="MySQL_Configuration" transactionalAction="ALWAYS_JOIN" doc:name="Copy_of_Database">
<db:dynamic-query><![CDATA[INSERT INTO `ib_trade`.`swt_symbol`(`idswt_symbol`,`symbol_name`,`symbol_exchange`,`symbol_id`) VALUES ("5","temp","1","1");]]></db:dynamic-query>
</db:insert>