Spring 自定义错误通道中任何异常的集成无限循环
Spring Integration Infinite Loop on any Exception in Custom Error Channel
我们的应用程序有一个自定义错误通道,主要通过电子邮件通知集成失败。当自定义错误通道的 SendEmail Http 调用(WebFlux.outboundGateway)中抛出异常时,集成流程进入无限循环。
请帮助阻止这种行为
@Bean("appErrorChannel")
public MessageChannel appErrorChannel() {
return new DirectChannel();
}
@Bean("appErrorFlow")
public IntegrationFlow errorFlow() {
// @formatter:off
return IntegrationFlows.from(appErrorChannel())
.<MessagingException> log(ERROR, message -> "Exception: " + ExceptionUtils.getStackTrace(message.getPayload()))
.transform(transformer, "errorMessage")
.transform(transformer, "emailRequest")
.log(INFO, message -> "Email Request: " + message)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(CONTENT_TYPE, APPLICATION_JSON_VALUE))
.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, errorFlowErrorChannel()))
.handle(WebFlux.outboundGateway(sendEmailURL, webClient)
.httpMethod(POST)
.expectedResponseType(String.class)
.mappedRequestHeaders(CONTENT_TYPE))
.log(INFO, message -> "Email Response: " + message)
.get();
// @formatter:on
}
@Bean("errorFlowErrorChannel")
public MessageChannel errorFlowErrorChannel() {
return new DirectChannel();
}
@Bean("errorChannelErrorFlow")
public IntegrationFlow errorChannelErrorFlow() {
// @formatter:off
return IntegrationFlows.from(errorFlowErrorChannel())
.<MessagingException> log(FATAL, message -> "Exception in Error Flow: " + ExceptionUtils.getStackTrace(message.getPayload()))
.get();
// @formatter:on
}
异常:
2022-04-09 11:45:11,198[0;39m [boundedElastic-1 ] [31mERROR[0;39m [36mo.s.i.w.o.WebFluxRequestExecutingMessageHandler[0;39m - [35me2abc8b6eba0119c[0;39m Failed to send async reply
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.appErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST ****** URL ******, failedMessage=GenericMessage [payload=******* SUPPRESSED ********}, headers={b3=e2abc8b6eba0119c-c4254a192695705d-1, nativeHeaders={}, errorChannel=bean 'appErrorChannel'; defined in: 'class path resource [**************}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendErrorMessage(AbstractMessageProducingHandler.java:496)
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.onFailure(AbstractMessageProducingHandler.java:555)
at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.notifyFailure(ListenableFutureCallbackRegistry.java:86)
at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.failure(ListenableFutureCallbackRegistry.java:158)
.....
.....
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
确实不建议为整个应用程序设置一个全局错误通道。
考虑为这个特定的错误处理流程建立自己的错误通道。
通过 enrichHeaders(MessageHeaders.ERROR_CHANNEL, "errorFlowErrorChannel")
设置它,不会出现无限循环,因为来自 WebFlux.outboundGateway()
的错误不会再转发到 appErrorChannel
。
我们的应用程序有一个自定义错误通道,主要通过电子邮件通知集成失败。当自定义错误通道的 SendEmail Http 调用(WebFlux.outboundGateway)中抛出异常时,集成流程进入无限循环。
请帮助阻止这种行为
@Bean("appErrorChannel")
public MessageChannel appErrorChannel() {
return new DirectChannel();
}
@Bean("appErrorFlow")
public IntegrationFlow errorFlow() {
// @formatter:off
return IntegrationFlows.from(appErrorChannel())
.<MessagingException> log(ERROR, message -> "Exception: " + ExceptionUtils.getStackTrace(message.getPayload()))
.transform(transformer, "errorMessage")
.transform(transformer, "emailRequest")
.log(INFO, message -> "Email Request: " + message)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(CONTENT_TYPE, APPLICATION_JSON_VALUE))
.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, errorFlowErrorChannel()))
.handle(WebFlux.outboundGateway(sendEmailURL, webClient)
.httpMethod(POST)
.expectedResponseType(String.class)
.mappedRequestHeaders(CONTENT_TYPE))
.log(INFO, message -> "Email Response: " + message)
.get();
// @formatter:on
}
@Bean("errorFlowErrorChannel")
public MessageChannel errorFlowErrorChannel() {
return new DirectChannel();
}
@Bean("errorChannelErrorFlow")
public IntegrationFlow errorChannelErrorFlow() {
// @formatter:off
return IntegrationFlows.from(errorFlowErrorChannel())
.<MessagingException> log(FATAL, message -> "Exception in Error Flow: " + ExceptionUtils.getStackTrace(message.getPayload()))
.get();
// @formatter:on
}
异常:
2022-04-09 11:45:11,198[0;39m [boundedElastic-1 ] [31mERROR[0;39m [36mo.s.i.w.o.WebFluxRequestExecutingMessageHandler[0;39m - [35me2abc8b6eba0119c[0;39m Failed to send async reply
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.appErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST ****** URL ******, failedMessage=GenericMessage [payload=******* SUPPRESSED ********}, headers={b3=e2abc8b6eba0119c-c4254a192695705d-1, nativeHeaders={}, errorChannel=bean 'appErrorChannel'; defined in: 'class path resource [**************}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendErrorMessage(AbstractMessageProducingHandler.java:496)
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.onFailure(AbstractMessageProducingHandler.java:555)
at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.notifyFailure(ListenableFutureCallbackRegistry.java:86)
at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.failure(ListenableFutureCallbackRegistry.java:158)
.....
.....
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
确实不建议为整个应用程序设置一个全局错误通道。
考虑为这个特定的错误处理流程建立自己的错误通道。
通过 enrichHeaders(MessageHeaders.ERROR_CHANNEL, "errorFlowErrorChannel")
设置它,不会出现无限循环,因为来自 WebFlux.outboundGateway()
的错误不会再转发到 appErrorChannel
。