如果在 DSL 中的出站网关中抛出错误,则路由到错误通道

Routing to an error channel if fault is thrown in outbound gateway in DSL

我有以下 Spring 集成 JAva DSL 代码:

    return flow -> flow.channel(INPUT_CHANNEL)
            .transform(customMapper, "mapFrom")
            .enrichHeaders(polarisPreCompSoapActionHeader())
            .route("headers.key",
                    subflowMapping -> subflowMapping
                            .subFlowMapping("value1", subflow -> subflow
                                    .handle(webserviceOutboundGateway,
                                            e -> e.advice(skipAdvice()))
                            )
            )
            .channel(OUTPUT_CHANNEL);

@Bean
public Advice skipAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setFailureChannel(errorChannel());
    return advice;
}

webserviceOutboundGateway 创建为 MarshallingWebServiceOutboundGateway 的 bean。

我想要实现的是在 SOAP 错误到达时将错误消息路由到错误通道。我认为我可以向处理程序添加一个建议,这个建议应该是一个 ExpressionEvaluatingRequestHandlerAdvice,我可以在其中设置失败通道。因此,每当在出站网关中抛出错误时,错误消息将被转发到错误通道。

然后问题是现在网关抛出异常并且流停止

2020-05-20 22:21:48,161 INFO  com.acme.Timing Thread=qtp14486859-12 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d REQUEST COMPLETE : time[39]ms  message[ProcessTran] endpoint[http://localhost:8081/system-integration-service/service]
2020-05-20 22:21:48,161 DEBUG org.springframework.integration.transformer.ContentEnricher$Gateway Thread=qtp14486859-12 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d failure occurred in gateway sendAndReceive: error occurred in message handler [webserviceOutboundGateway]; nested exception is org.springframework.ws.client.WebServiceTransportException: Not Found [404]
2020-05-20 22:21:48,162 DEBUG org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway Thread=qtp14486859-12 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d failure occurred in gateway sendAndReceive: error occurred in message handler [webserviceOutboundGateway]; nested exception is org.springframework.ws.client.WebServiceTransportException: Not Found [404]
2020-05-20 22:21:48,167 ERROR com.acme.webservice.OrchestrationServiceEndpoint Thread=qtp14486859-12 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Error
org.springframework.ws.client.WebServiceTransportException: Not Found [404]
        at org.springframework.ws.client.core.WebServiceTemplate.handleError(WebServiceTemplate.java:699)
        at org.springframework.ws.client.core.WebServiceTemplate.doSendAndReceive(WebServiceTemplate.java:609)
        at org.springframework.ws.client.core.WebServiceTemplate.sendAndReceive(WebServiceTemplate.java:555)
        at org.springframework.integration.ws.MarshallingWebServiceOutboundGateway.doHandle(MarshallingWebServiceOutboundGateway.java:87)
        at com.acme.webservice.MarshallingWebServiceOutboundGateway.doHandle(MarshallingWebServiceOutboundGateway.java:60)
        at org.springframework.integration.ws.AbstractWebServiceOutboundGateway.handleRequestMessage(AbstractWebServiceOutboundGateway.java:188)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:183)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:162)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
        at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:194)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:183)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:162)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:183)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:162)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)

任何建议/帮助将不胜感激!
谢谢,
五、


更新 1

我将 trapExecution 设置为 false,直接将建议添加到网关,我必须使用 setOnFailureExpressionString 否则流程会挂起。那么现在的样子如下:

@Bean
@Autowired
public MarshallingWebServiceOutboundGateway webserviceOutboundGateway(...) {
    MarshallingWebServiceOutboundGateway gateway = new ...;
    gateway.setAdviceChain(Arrays.asList(skipAdvice()));
    return gateway;
}

@Autowired
@Qualifier("skipCallChannel")
private MessageChannel skipCallChannel;

@Bean
public Advice skipAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setFailureChannel(endChannel);
    advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception");
    advice.setTrapException(true);
    return advice;
}

我收到以下错误:

2020-05-21 14:45:13,921 DEBUG org.springframework.integration.handler.BridgeHandler Thread=qtp14486859-19 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d org.springframework.integration.handler.BridgeHandler@cdf55 received message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.ws.client.WebServiceTransportException: Not Found [404], failedMessage=GenericMessage [payload=uk.co.acme._2009._03.ProcessTran@1759824, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ddbf77, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ddbf77, header.cqs.model=com.acme.model.FlowModel@cc006f, ws_soapAction=http://www.acme.co.uk/XRTEService/2009/03/ProcessTran, id=59e305ee-1672-d5ba-db4b-672198f25ab8, timestamp=1590068713891}], headers={id=14aeb6ed-77af-52bf-6b53-72bc0a4bb5c1, timestamp=1590068713921}]
2020-05-21 14:45:13,922 DEBUG org.springframework.integration.transformer.ContentEnricher$Gateway Thread=qtp14486859-19 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d failure occurred in gateway sendAndReceive: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
2020-05-21 14:45:13,922 DEBUG org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway Thread=qtp14486859-19 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d failure occurred in gateway sendAndReceive: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
2020-05-21 14:45:13,927 ERROR com.acme.webservice.OrchestrationServiceEndpoint Thread=qtp14486859-19 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Error
org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
        at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:183)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:162)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
        at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.evaluateFailureExpression(ExpressionEvaluatingRequestHandlerAdvice.java:271)
        at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.doInvoke(ExpressionEvaluatingRequestHandlerAdvice.java:221)
        at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
        at com.sun.proxy.$Proxy190.handleRequestMessage(Unknown Source)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:127)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:112)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:183)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:162)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
        at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:194)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:183)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:162)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)

我想我必须添加通道才能成功执行,但我不想这样做,因为它已经在处理程序所在的流程中定义(请参阅 .channel(OUTPUT_CHANNEL))。 advice中是否可以只设置故障通道?
谢谢!


更新 2

我按照 Gary 的建议做了(即向上游流添加了一个错误通道),错误消息现在被传送到错误通道。但是现在我有另一个问题与我原来的问题隐约相关。所以现在我有以下例外:

2020-05-22 10:10:48,023 ERROR com.acme.webservice.OrchestrationServiceEndpoint Thread=qtp14486859-13 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Error
org.springframework.messaging.MessagingException: failure occurred in error-handling flow; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'enrich.acmeRequest.output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=uk.co.acme.payload.request._2017._06.Message@4a5e6c, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@19d4520, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@19d4520, ws_soapAction=http://www.acme.co.uk/XRTEService/ProcessTran, id=902bd270-89d8-62e9-b00f-b69399241bd1, timestamp=1590138648017}], ...}]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:489)
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:426)
    at org.springframework.integration.transformer.ContentEnricher$Gateway.sendAndReceiveMessage(ContentEnricher.java:481)
    at org.springframework.integration.transformer.ContentEnricher.handleRequestMessage(ContentEnricher.java:383)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)

在主要流程中我有以下内容:

    return flow -> flow.channel(ORCH_REQUEST_INPUT)
            .<HomeRequest, HomeModel>transform(requestToModelTransformer)
            ...
            // 
            .enrich(this::acmePreCompRequestEnricher)
            .enrich(this::acmePreCompEnricher)
            .handle(this.acmePreCompResponseValidator())
            // 
            .enrich(this::mlRequestEnricher)
            .enrich(this::mlEnricher)
            // 
            .enrich(this::acmeRequestEnricher)
            .enrich(this::acmeEnricher)
            ...

在使用出站网关的acmePreCompEnricher 中抛出SOAP 错误。错误通道设置为 skip.ml.input 然后我必须遵循处理错误的流程:

    return flow -> flow.channel("skip.ml.input")
            .transform(ErrorMessage.class, (ErrorMessage m) -> {
                        Message originalMessage = ((MessageHandlingException)m.getPayload()).getFailedMessage();
                        return MessageBuilder.withPayload(originalMessage.getHeaders().get(HEADER_CQS_MODEL, HomeQuoteModel.class))
                                .copyHeaders(originalMessage.getHeaders())
                                .build();
                    })
            .channel("enrich.acmeRequest.output");

enrich.acmeRequest.output频道是acmeRequestEnricher的回复频道。我的意图是,如果没有错误,则执行流程中的所有任务,但如果 acmePreCompEnricher 中存在 SOAP 错误,则跳过 mlRequestEnrichermlEnricher 并直接转到 acmeRequestEnricher.
我认为问题是在故障情况下,SI 看不到什么订阅了 acmeRequestEnricher 的频道...?在这种情况下我能做什么?

ExpressionEvaluatingRequestHandlerAdvice上看到这个方法:

/**
 * If true, any exception will be caught and null returned.
 * Default false.
 * @param trapException true to trap Exceptions.
 */
public void setTrapException(boolean trapException) {

因此,将其设置为 true 以避免异常重新抛出。

也可以在文档中查看更多内容:https://docs.spring.io/spring-integration/docs/5.3.0.RELEASE/reference/html/messaging-endpoints.html#expression-advice

更新

由于除建议外一切正常,它看起来像是过时的 Java DSL 项目中的错误。请考虑将您的 skipAdvice 直接注入到 webserviceOutboundGateway 中。它有各自的 setter:

/**
 * Configure a list of {@link Advice}s to proxy a {@link #handleRequestMessage(Message)} method.
 * @param adviceChain the list of {@link Advice}s to use.
 */
public void setAdviceChain(List<Advice> adviceChain) {

更新2

好的。看起来您需要在处理完该 SOAP 错误后发回回复。 因此,您可能不需要 failureChannel 配置。到目前为止,您所拥有的表达式应该足以捕获异常并将一些纯字符串作为回复发送回顶部的网关。

正如 Gary Russell 在他的评论之一中所建议的那样,我在上游流中设置了一个错误通道(请参阅 UPDATE2)。