Spring 电子邮件集成错误处理最后一次
Spring Integration Error Handling with Email Once at end
如何处理下面 SpringIntegrationFlow 中抛出的错误,只需记录错误并继续下一条记录。无论 1 个或多个记录失败,也应该能够只发送一次电子邮件。是否有与 Spring Batch 的 StepListener 等效的东西?谢谢
return IntegrationFlows.from(jdbcMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, appErrorChannel()))
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.transform(transformer, "transform")
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
.handle(Http.outboundGateway(url)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.requestFactory(requestFactory))
.get();
@Bean
public MessageChannel appErrorChannel() {
return new DirectChannel();
}
@Bean("appErrorFlow")
public IntegrationFlow appErrorFlow() {
// @formatter:off
return IntegrationFlows.from(appErrorChannel())
.log(Level.ERROR, message -> " Failed Message " + message.getPayload())
.aggregate(a -> a.correlationStrategy(m -> !m.getHeaders().isEmpty()))
.handle(Http.outboundGateway(mailURL)
.httpMethod(HttpMethod.POST))
.get();
// @formatter:on
}
异常:
2022-01-13 06:41:31,702 [Worker-1 ] WARN o.s.i.c.MessagePublishingErrorHandler - 005006f2dee5b673 Error message was not delivered.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=*******Removed On Purpose*******]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:96)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:60)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 11 common frames omitted
我认为最接近 StepListener
的是 ChannelInterceptor
。
请参阅 IntegrationFlowDefinition.intercept()
运算符以放置在该流程中的端点之间。
然而,根据您的拆分器配置,您似乎不会丢失任何内容,只会记录错误。 c.executor()
可以将每条记录的工作转移到线程池中,并且它的错误不会影响其余记录。
无论如何,在 split()
- .enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, myErrorChannel()))
.
之前将自定义错误通道设置为 headers 听起来更好
比起你有一个聚合器订阅了这个频道,你可以在其中聚合批处理的错误并发出消息以发送电子邮件。这实际上每批次只会发生一次,如果没有错误,则 none 根本不会发生。
如何处理下面 SpringIntegrationFlow 中抛出的错误,只需记录错误并继续下一条记录。无论 1 个或多个记录失败,也应该能够只发送一次电子邮件。是否有与 Spring Batch 的 StepListener 等效的东西?谢谢
return IntegrationFlows.from(jdbcMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, appErrorChannel()))
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.transform(transformer, "transform")
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
.handle(Http.outboundGateway(url)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.requestFactory(requestFactory))
.get();
@Bean
public MessageChannel appErrorChannel() {
return new DirectChannel();
}
@Bean("appErrorFlow")
public IntegrationFlow appErrorFlow() {
// @formatter:off
return IntegrationFlows.from(appErrorChannel())
.log(Level.ERROR, message -> " Failed Message " + message.getPayload())
.aggregate(a -> a.correlationStrategy(m -> !m.getHeaders().isEmpty()))
.handle(Http.outboundGateway(mailURL)
.httpMethod(HttpMethod.POST))
.get();
// @formatter:on
}
异常:
2022-01-13 06:41:31,702 [Worker-1 ] WARN o.s.i.c.MessagePublishingErrorHandler - 005006f2dee5b673 Error message was not delivered.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=*******Removed On Purpose*******]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:96)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:60)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 11 common frames omitted
我认为最接近 StepListener
的是 ChannelInterceptor
。
请参阅 IntegrationFlowDefinition.intercept()
运算符以放置在该流程中的端点之间。
然而,根据您的拆分器配置,您似乎不会丢失任何内容,只会记录错误。 c.executor()
可以将每条记录的工作转移到线程池中,并且它的错误不会影响其余记录。
无论如何,在 split()
- .enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, myErrorChannel()))
.
比起你有一个聚合器订阅了这个频道,你可以在其中聚合批处理的错误并发出消息以发送电子邮件。这实际上每批次只会发生一次,如果没有错误,则 none 根本不会发生。