Spring 电子邮件集成错误处理最后一次

Spring Integration Error Handling with Email Once at end

如何处理下面 SpringIntegrationFlow 中抛出的错误,只需记录错误并继续下一条记录。无论 1 个或多个记录失败,也应该能够只发送一次电子邮件。是否有与 Spring Batch 的 StepListener 等效的东西?谢谢

return IntegrationFlows.from(jdbcMessageSource(), p -> p.poller(pollerSpec()))
                       .enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, appErrorChannel()))
                       .split()
                       .channel(c -> c.executor(Executors.newCachedThreadPool()))
                       .transform(transformer, "transform")
                       .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
                       .handle(Http.outboundGateway(url)
                                   .httpMethod(HttpMethod.POST)
                                   .expectedResponseType(String.class)
                                   .requestFactory(requestFactory))
                       .get();

@Bean
public MessageChannel appErrorChannel() {

    return new DirectChannel();
}

@Bean("appErrorFlow")
public IntegrationFlow appErrorFlow() {

    // @formatter:off
    return IntegrationFlows.from(appErrorChannel())
                            .log(Level.ERROR, message -> " Failed Message " + message.getPayload())
                            .aggregate(a -> a.correlationStrategy(m -> !m.getHeaders().isEmpty()))
                            .handle(Http.outboundGateway(mailURL)
                                        .httpMethod(HttpMethod.POST))
                            .get();
    // @formatter:on
}

异常:

2022-01-13 06:41:31,702 [Worker-1                  ] WARN  o.s.i.c.MessagePublishingErrorHandler - 005006f2dee5b673 Error message was not delivered.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=*******Removed On Purpose*******]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:96)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:60)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 11 common frames omitted

我认为最接近 StepListener 的是 ChannelInterceptor

请参阅 IntegrationFlowDefinition.intercept() 运算符以放置在该流程中的端点之间。

然而,根据您的拆分器配置,您似乎不会丢失任何内容,只会记录错误。 c.executor() 可以将每条记录的工作转移到线程池中,并且它的错误不会影响其余记录。

无论如何,在 split() - .enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, myErrorChannel())).

之前将自定义错误通道设置为 headers 听起来更好

比起你有一个聚合器订阅了这个频道,你可以在其中聚合批处理的错误并发出消息以发送电子邮件。这实际上每批次只会发生一次,如果没有错误,则 none 根本不会发生。