Spring 与 AMQP 集成:在 error-channel 上获取原始消息的 custom-headers
Spring Integration with AMQP: get original message's custom-headers on error-channel
在我的项目中使用 Spring 与 RabbitMQ 的集成我遇到了问题。
该项目包括从 queue 接收消息、跟踪传入消息、使用 service-activator 处理消息以及跟踪服务激活器抛出的响应或异常。
这是示例配置:
<!-- inbound-gateway -->
<int-amqp:inbound-gateway id="inboundGateway"
request-channel="gatewayRequestChannel"
queue-names="myQueue"
connection-factory="rabbitMQConnectionFactory"
reply-channel="gatewayResponseChannel"
error-channel="gatewayErrorChannel"
error-handler="rabbitMQErrorHandler"
mapped-request-headers="traceId"
mapped-reply-headers="traceId" />
<!-- section to dispatch incoming messages to trace and execute service-activator -->
<int:publish-subscribe-channel id="gatewayRequestChannel" />
<int:bridge input-channel="gatewayRequestChannel" output-channel="traceChannel"/>
<int:bridge input-channel="gatewayRequestChannel" output-channel="serviceActivatorInputChannel"/>
<!-- the trace channel-->
<int:logging-channel-adapter id="traceChannel"
expression="headers['traceId'] + '= [Headers=' + headers + ', Payload=' + payload+']'" logger-name="my.logger" level="DEBUG" />
<!-- service activator which may throw an exception -->
<int:service-activator ref="myBean" method="myMethod" input-channel="serviceActivatorInputChannel" output-channel="serviceActivatorOutputChannel"/>
<!-- section to dispatch output-messages from service-activator to trace them and return them to the gateway -->
<int:publish-subscribe-channel id="serviceActivatorOutputChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
output-channel="traceChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
output-channel="gatewayResponseChannel" />
<!-- section to dispatch exceptions from service-activator to trace them and return them to the gateway -->
<int:bridge input-channel="gatewayErrorChannel"
output-channel="traceChannel" />
<int:bridge input-channel="gatewayErrorChannel"
output-channel="gatewayResponseChannel" />
我简化了代码以适合我的解释。这个想法是跟踪输入和 output/error 消息传入和传出 to/from service-activator。为此,我使用了一条名为 traceId 的消息 header。此标识符用作 correlation-identifier 以便能够将 request-message 与其响应相关联(这两个消息共享相同的 traceId 值)。
当 service-activator 没有抛出异常时一切正常。
但是当抛出异常时,网关似乎生成了一条新消息,没有我原来的 traceId header.
仔细查看网关代码,我在 class org.springframework.integration.gateway.MessagingGatewaySupport 中找到以下代码:
private Object doSendAndReceive(Object object, boolean shouldConvert) {
...
if (error != null) {
if (this.errorChannel != null) {
Message<?> errorMessage = new ErrorMessage(error);
Message<?> errorFlowReply = null;
try {
errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage);
}
...
}
似乎,当发生异常时,会创建一条新消息,将异常消息作为有效负载并发送到网关的errorChannel。在这里我失去了我的习惯 headers.
有没有办法在发生异常时保留我的custom-headers? (也许有一种配置它的方法,我可能会错过它......)。或者也许我没有以正确的方式实施我的流程。如果是这种情况,欢迎任何意见或建议。
对了,我用的是spring-integration-core神器的4.0.3.RELEASE版本
感谢您的回答
编辑:正如 Gary Russel 所说,此示例缺少以下 puslish/subscribe queue 配置
<int:publish-subscribe-channel id="gatewayErrorChannel"/>
error-channel
上的消息是 ErrorMessage
。它有两个属性:cause
- 原始异常和 failedMessage
- 失败时的消息。 ErrorMessage
没有得到 failedMessage
的 header。
您不能不做一些额外的工作就直接将 ErrorMessage
发送回网关。
通常,错误流会在返回响应之前对错误进行一些分析。
如果你想恢复一些自定义 header,你将需要一个 header 错误流的增强器。
类似
<int:header-enricher ...>
<int:header name="traceId" expression="payload.failedMessage.headers.traceId" />
</int:header-enricher>
此外,您的配置有点奇怪,因为您在 gatewayErrorChannel
上有 2 个订阅者。除非是 <publish-subscribe-channel/>
,否则这些消费者将获得备用消息;看起来你希望他们都能得到它,所以你需要正确声明频道。
在我的项目中使用 Spring 与 RabbitMQ 的集成我遇到了问题。 该项目包括从 queue 接收消息、跟踪传入消息、使用 service-activator 处理消息以及跟踪服务激活器抛出的响应或异常。 这是示例配置:
<!-- inbound-gateway -->
<int-amqp:inbound-gateway id="inboundGateway"
request-channel="gatewayRequestChannel"
queue-names="myQueue"
connection-factory="rabbitMQConnectionFactory"
reply-channel="gatewayResponseChannel"
error-channel="gatewayErrorChannel"
error-handler="rabbitMQErrorHandler"
mapped-request-headers="traceId"
mapped-reply-headers="traceId" />
<!-- section to dispatch incoming messages to trace and execute service-activator -->
<int:publish-subscribe-channel id="gatewayRequestChannel" />
<int:bridge input-channel="gatewayRequestChannel" output-channel="traceChannel"/>
<int:bridge input-channel="gatewayRequestChannel" output-channel="serviceActivatorInputChannel"/>
<!-- the trace channel-->
<int:logging-channel-adapter id="traceChannel"
expression="headers['traceId'] + '= [Headers=' + headers + ', Payload=' + payload+']'" logger-name="my.logger" level="DEBUG" />
<!-- service activator which may throw an exception -->
<int:service-activator ref="myBean" method="myMethod" input-channel="serviceActivatorInputChannel" output-channel="serviceActivatorOutputChannel"/>
<!-- section to dispatch output-messages from service-activator to trace them and return them to the gateway -->
<int:publish-subscribe-channel id="serviceActivatorOutputChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
output-channel="traceChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
output-channel="gatewayResponseChannel" />
<!-- section to dispatch exceptions from service-activator to trace them and return them to the gateway -->
<int:bridge input-channel="gatewayErrorChannel"
output-channel="traceChannel" />
<int:bridge input-channel="gatewayErrorChannel"
output-channel="gatewayResponseChannel" />
我简化了代码以适合我的解释。这个想法是跟踪输入和 output/error 消息传入和传出 to/from service-activator。为此,我使用了一条名为 traceId 的消息 header。此标识符用作 correlation-identifier 以便能够将 request-message 与其响应相关联(这两个消息共享相同的 traceId 值)。
当 service-activator 没有抛出异常时一切正常。 但是当抛出异常时,网关似乎生成了一条新消息,没有我原来的 traceId header.
仔细查看网关代码,我在 class org.springframework.integration.gateway.MessagingGatewaySupport 中找到以下代码:
private Object doSendAndReceive(Object object, boolean shouldConvert) {
...
if (error != null) {
if (this.errorChannel != null) {
Message<?> errorMessage = new ErrorMessage(error);
Message<?> errorFlowReply = null;
try {
errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage);
}
...
}
似乎,当发生异常时,会创建一条新消息,将异常消息作为有效负载并发送到网关的errorChannel。在这里我失去了我的习惯 headers.
有没有办法在发生异常时保留我的custom-headers? (也许有一种配置它的方法,我可能会错过它......)。或者也许我没有以正确的方式实施我的流程。如果是这种情况,欢迎任何意见或建议。
对了,我用的是spring-integration-core神器的4.0.3.RELEASE版本
感谢您的回答
编辑:正如 Gary Russel 所说,此示例缺少以下 puslish/subscribe queue 配置
<int:publish-subscribe-channel id="gatewayErrorChannel"/>
error-channel
上的消息是 ErrorMessage
。它有两个属性:cause
- 原始异常和 failedMessage
- 失败时的消息。 ErrorMessage
没有得到 failedMessage
的 header。
您不能不做一些额外的工作就直接将 ErrorMessage
发送回网关。
通常,错误流会在返回响应之前对错误进行一些分析。
如果你想恢复一些自定义 header,你将需要一个 header 错误流的增强器。
类似
<int:header-enricher ...>
<int:header name="traceId" expression="payload.failedMessage.headers.traceId" />
</int:header-enricher>
此外,您的配置有点奇怪,因为您在 gatewayErrorChannel
上有 2 个订阅者。除非是 <publish-subscribe-channel/>
,否则这些消费者将获得备用消息;看起来你希望他们都能得到它,所以你需要正确声明频道。