Spring 消息进入 errorChannel 时将 JMS 与 JTA 回滚集成
Spring Integration JMS with JTA rollback when message goes to errorChannel
我正在使用 Spring 通过 Atomikos 和 JMS 与入站和出站绑定到不同 Webshpere MQ 的 JTA 支持集成。
流程如下:
- JMS 入站通道适配器接收消息
- 一些转换
- 输出队列的 JMS 出站通道适配器
- 发生错误时,
errorChannel
收到消息
- 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将已处理的错误路由到将它们发送到 2 个错误队列的接收者列表路由器
我的问题是,即使消息在 errorChannel 下游(在已处理的异常情况下并到达错误队列),我也希望提交事务。
据我所知,只有在异常被重新抛出时才会发生回滚(这就是我重新抛出未处理的异常的原因),但在我的情况下,事务会在消息到达 errorChannel 时立即回滚(在它被路由到其他地方之前)。
我做错了什么?
配置如下。
<jms:inbound-channel-adapter id="jms-in"
destination="input-queue"
connection-factory="inConnectionFactory"
channel="edi-inbound"
acknowledge="transacted">
<poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
>
<transactional timeout="${process.tx.timeout-sec:60}"/>
</poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>
<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
<object-to-string-transformer/>
<service-activator ref="inbound" method="service"/>
</chain>
<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
<transformer ref="edi2xml-converter"/>
<transformer ref="xml-mapper"/>
</chain>
<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
<service-activator ref="outbound-message-composer" />
</chain>
<channel id="outbound-channel">
<interceptors>
<beans:ref bean="outbound-interceptor" />
</interceptors>
</channel>
<recipient-list-router input-channel="outbound-channel">
<recipient channel="file-outbound"/>
<recipient channel="queue-outbound"/>
</recipient-list-router>
<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>
<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
directory="${output.directory}"
filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
delete-source-files="true" />
<!-- notification outbound flow -->
<channel id="errorChannel">
<interceptors>
<wire-tap channel="logger"/>
</interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>
<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
<mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>
<recipient-list-router input-channel="error-notification-channel">
<recipient channel="queue-outbound-error"/>
<recipient channel="queue-inbound-error"/>
</recipient-list-router>
<chain input-channel="queue-outbound-error">
<service-activator ref="outbound-error-composer" />
<jms:outbound-channel-adapter id="jms-out-error"
destination="error-output-queue"
connection-factory="outConnectionFactory"
session-transacted="true"/>
</chain>
<chain input-channel="queue-inbound-error">
<service-activator ref="error-notif-composer" />
<jms:outbound-channel-adapter id="jms-in-error"
destination="error-input-queue"
connection-factory="outConnectionFactory"
session-transacted="true"/>
</chain>
<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>
为了完整起见,当 tx 在错误通道上回滚时,两个错误队列在任何情况下都会收到消息(就好像出站适配器不会参与事务一样),以及正常流的 tx (当没有错误发生时)完美运行。
没错。
因为你使用Polling Inbound Channel Adapter
。它的逻辑是这样的:
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
...
if (!Poller.this.pollingTask.call()) {
break;
}
...
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessageHandlingException(new ErrorMessage(e), e);
}
}
}
});
您的 TX 是 pollingTask
代理的一部分,作为 AOP TransactionInterceptor
。
errorChannel
是 this.taskExecutor
的 ErrorHandler
的一部分。
因此,只有当我们从 pollingTask
抛出异常时,我们才能到达 errorChannel
。既然我们那里有 TX,它当然会回调。
我的观点是:Polling Inbound Channel Adapter
中的错误处理过程是在 TX 之外完成的。
考虑切换到 <int-jms:message-driven-channel-adapter>
。
我正在使用 Spring 通过 Atomikos 和 JMS 与入站和出站绑定到不同 Webshpere MQ 的 JTA 支持集成。 流程如下:
- JMS 入站通道适配器接收消息
- 一些转换
- 输出队列的 JMS 出站通道适配器
- 发生错误时,
errorChannel
收到消息 - 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将已处理的错误路由到将它们发送到 2 个错误队列的接收者列表路由器
我的问题是,即使消息在 errorChannel 下游(在已处理的异常情况下并到达错误队列),我也希望提交事务。 据我所知,只有在异常被重新抛出时才会发生回滚(这就是我重新抛出未处理的异常的原因),但在我的情况下,事务会在消息到达 errorChannel 时立即回滚(在它被路由到其他地方之前)。
我做错了什么?
配置如下。
<jms:inbound-channel-adapter id="jms-in"
destination="input-queue"
connection-factory="inConnectionFactory"
channel="edi-inbound"
acknowledge="transacted">
<poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
>
<transactional timeout="${process.tx.timeout-sec:60}"/>
</poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>
<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
<object-to-string-transformer/>
<service-activator ref="inbound" method="service"/>
</chain>
<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
<transformer ref="edi2xml-converter"/>
<transformer ref="xml-mapper"/>
</chain>
<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
<service-activator ref="outbound-message-composer" />
</chain>
<channel id="outbound-channel">
<interceptors>
<beans:ref bean="outbound-interceptor" />
</interceptors>
</channel>
<recipient-list-router input-channel="outbound-channel">
<recipient channel="file-outbound"/>
<recipient channel="queue-outbound"/>
</recipient-list-router>
<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>
<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
directory="${output.directory}"
filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
delete-source-files="true" />
<!-- notification outbound flow -->
<channel id="errorChannel">
<interceptors>
<wire-tap channel="logger"/>
</interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>
<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
<mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>
<recipient-list-router input-channel="error-notification-channel">
<recipient channel="queue-outbound-error"/>
<recipient channel="queue-inbound-error"/>
</recipient-list-router>
<chain input-channel="queue-outbound-error">
<service-activator ref="outbound-error-composer" />
<jms:outbound-channel-adapter id="jms-out-error"
destination="error-output-queue"
connection-factory="outConnectionFactory"
session-transacted="true"/>
</chain>
<chain input-channel="queue-inbound-error">
<service-activator ref="error-notif-composer" />
<jms:outbound-channel-adapter id="jms-in-error"
destination="error-input-queue"
connection-factory="outConnectionFactory"
session-transacted="true"/>
</chain>
<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>
为了完整起见,当 tx 在错误通道上回滚时,两个错误队列在任何情况下都会收到消息(就好像出站适配器不会参与事务一样),以及正常流的 tx (当没有错误发生时)完美运行。
没错。
因为你使用Polling Inbound Channel Adapter
。它的逻辑是这样的:
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
...
if (!Poller.this.pollingTask.call()) {
break;
}
...
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessageHandlingException(new ErrorMessage(e), e);
}
}
}
});
您的 TX 是 pollingTask
代理的一部分,作为 AOP TransactionInterceptor
。
errorChannel
是 this.taskExecutor
的 ErrorHandler
的一部分。
因此,只有当我们从 pollingTask
抛出异常时,我们才能到达 errorChannel
。既然我们那里有 TX,它当然会回调。
我的观点是:Polling Inbound Channel Adapter
中的错误处理过程是在 TX 之外完成的。
考虑切换到 <int-jms:message-driven-channel-adapter>
。