Spring 自定义错误通道中任何异常的集成无限循环

Spring Integration Infinite Loop on any Exception in Custom Error Channel

我们的应用程序有一个自定义错误通道,主要通过电子邮件通知集成失败。当自定义错误通道的 SendEmail Http 调用(WebFlux.outboundGateway)中抛出异常时,集成流程进入无限循环。

请帮助阻止这种行为

    @Bean("appErrorChannel")
    public MessageChannel appErrorChannel() {
    
        return new DirectChannel();
    }
    
    @Bean("appErrorFlow")
    public IntegrationFlow errorFlow() {
    
        // @formatter:off
        return IntegrationFlows.from(appErrorChannel())
                                .<MessagingException> log(ERROR, message -> "Exception: " + ExceptionUtils.getStackTrace(message.getPayload()))
                                .transform(transformer, "errorMessage")
                                .transform(transformer, "emailRequest")
                                .log(INFO, message -> "Email Request: " + message)
                                .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(CONTENT_TYPE, APPLICATION_JSON_VALUE))
                                .enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, errorFlowErrorChannel()))
                                .handle(WebFlux.outboundGateway(sendEmailURL, webClient)
                                                .httpMethod(POST)
                                                .expectedResponseType(String.class)
                                                .mappedRequestHeaders(CONTENT_TYPE))
                                .log(INFO, message -> "Email Response: " + message)
                                .get();
        // @formatter:on
    }

    @Bean("errorFlowErrorChannel")
    public MessageChannel errorFlowErrorChannel() {
    
        return new DirectChannel();
    }
        
    @Bean("errorChannelErrorFlow")
    public IntegrationFlow errorChannelErrorFlow() {
    
        // @formatter:off
        return IntegrationFlows.from(errorFlowErrorChannel())
                                .<MessagingException> log(FATAL, message -> "Exception in Error Flow: " + ExceptionUtils.getStackTrace(message.getPayload()))
                                .get();
        // @formatter:on
    }

异常:

2022-04-09 11:45:11,198[0;39m [boundedElastic-1          ] [31mERROR[0;39m [36mo.s.i.w.o.WebFluxRequestExecutingMessageHandler[0;39m - [35me2abc8b6eba0119c[0;39m Failed to send async reply
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.appErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST ****** URL ******, failedMessage=GenericMessage [payload=******* SUPPRESSED ********}, headers={b3=e2abc8b6eba0119c-c4254a192695705d-1, nativeHeaders={}, errorChannel=bean 'appErrorChannel'; defined in: 'class path resource [**************}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendErrorMessage(AbstractMessageProducingHandler.java:496)
    at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.onFailure(AbstractMessageProducingHandler.java:555)
    at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.notifyFailure(ListenableFutureCallbackRegistry.java:86)
    at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.failure(ListenableFutureCallbackRegistry.java:158)
.....
.....
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)

确实不建议为整个应用程序设置一个全局错误通道。

考虑为这个特定的错误处理流程建立自己的错误通道。 通过 enrichHeaders(MessageHeaders.ERROR_CHANNEL, "errorFlowErrorChannel") 设置它,不会出现无限循环,因为来自 WebFlux.outboundGateway() 的错误不会再转发到 appErrorChannel