如何将错误从执行程序通道路由到错误通道?

How do I route errors from executor channel to error channel?

设置请参考附图。解释如下。

有一个接收请求的通用输入通道。从这个输入通道,有两个流:

  1. 流程 1 - 将请求存储到 DB

  2. 流程 2 - 将业务请求 Processing/forwarding 发送到其他外部系统

我希望流程 1 和流程 2 彼此独立。所以我将 Flow 1 放在执行程序通道上。这样,流程 1 中的错误不会中断流程 2。

流程 1 的解释:

  1. 代码从公共输入通道读取请求并将其放入执行器通道。
  2. 从执行者通道,class DBStore 读取请求并将其存储到数据库中。
  3. 我还有一个错误通道(对项目中的所有 classes 都是通用的)它会安静地记录错误

我有:

在绿色框中的代码中,我定义了一个ExpressionEvaluatingRequestHandlerAdvice,以便将执行程序通道上的任何错误发送到错误通道。我假设 ExpressionEvaluatingRequestHandlerAdvice 将自动应用于执行程序通道。

相反,如果出现错误,它会重新发布到 'Common input channel' 并重复处理,直到队列填满。

我需要:

我希望将执行程序通道上的任何错误发送到错误通道,在那里它会被安静地记录并处理消息。

从公共输入通道读取并放入执行器通道的代码:

    @Configuration
@EnableIntegration
public class InputChanneltoExecutorChannelConfig {

//DEFINING THE EXECUTOR CHANNEL
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }

    @Bean(name="executorChannelToDB")
    public ExecutorChannel outboundRequests() {
        return new ExecutorChannel(taskExecutor());
    }
//DEFINE FAILURE CHANNEL FOR USE IN ExpressionEvaluatingRequestHandlerAdvice
    @Bean(name = "DBFailureChannel")
    public static MessageChannel getFailureChannel() {
        return new DirectChannel();
    }   

//MAIN METHOD THAT READS FROM INPUT CHANNEL AND SENDS TO EXECUTOR CHANNEL
    @Bean
    public IntegrationFlow outboundtoDB() {
        return IntegrationFlows
                .from("commonInputChannel")
                /*
                 * We publish the msg to be stored into the DB onto a executor
                 * channel (so that the DB operation is processed on a separate
                 * thread).
                 */
                .channel("executorChannelToDB").get();
                /****************************************************************************
                        *********************************************************
                 * How do I route the error from executor channel to error channel over here?
                        **********************************************************
                 ****************************************************************************/
    }

    /*
     * Create an advice bean to handle DB errors. In case of failure, send
     * response to a separate channel.
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setFailureChannelName("DBFailureChannel");
        advice.setOnFailureExpressionString("'##Error while storing request into DB'");
        advice.setTrapException(true);
        return advice;
    }

    /*
     * We create a separate flow for DB failure because in future we may need
     * other actions such as retries/notify support in addition to logging.
     */
    @Bean
    public IntegrationFlow failure() {
        return IntegrationFlows.from("DBFailureChannel")
                .channel("errorChannel").get();

    }   
}

更新: 根据 Gary 的建议,更新了 ERROR_CHANNEL 和 REPLY_CHANNEL.

   @Bean
    public IntegrationFlow outboundtoDB() {
        return IntegrationFlows
                .from("commonInputChannel")
                //Setting Headers
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .enrichHeaders(h -> h.header(MessageHeaders.REPLY_CHANNEL, "DBSuccessChannel", true))
                .channel("executorChannelToDB").get();

DBSuccess 通道设置为处理如下响应:

@Bean
public IntegrationFlow success() {
    return IntegrationFlows
            .from("DBSuccessChannel")
            .wireTap(
                    flow -> flow.handle(msg -> logger
                            .info("Response from storing in DB : "
                                    + msg.getPayload()))).get();
}

但我仍然得到错误,

2018-09-26 23:34:47.398 ERROR 17186 --- [SimpleAsyncTaskExecutor-465] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: nested exception is java.time.format.DateTimeParseException: Text 'sample creation timestamp' could not be parsed at index 0, failedMessage=GenericMessage [payload=com.td.sba.iep.schema.InstructionRs@37919153, headers={errorChannel=errorChannel, jms_destination=commonInputChannel, Solace_JMS_Prop_IS_Reply_Message=false, priority=0, jms_timestamp=1538018141672, JMS_Solace_isXML=true, replyChannel=DBSuccessChannel, jms_redelivered=true, JMS_Solace_DeliverToOne=false, JMS_Solace_ElidingEligible=false, JMS_Solace_DeadMsgQueueEligible=false, id=ff6c2ea6-b6d6-c67a-7943-6b7db33bb977, jms_messageId=ID:49.37.4.163d608166190664e70:0, timestamp=1538019287394}]

此处,jms_destination 仍设置为输入通道,错误不断被重新发布到 commonInputChannel。 你能帮忙吗?

该建议无济于事,因为它仅适用于该端点 - 不适用于下游流程,并且在任何情况下,即使适用,向执行者的移交也会成功,并且任何下游异常都由执行者(被包装在 ErrorHandlingTaskExecutorMessagePublishingErrorHandler 中)。

尝试用 header enricher 替换该组件,并设置 errorChannel header。或者您可以使用配置有错误通道的 MPEH 自己包装 TE(执行器通道将检测到 TE 已经是 EHTE)。

编辑

这对我来说很好...

@SpringBootApplication
public class So52526134Application {

    public static void main(String[] args) {
        SpringApplication.run(So52526134Application.class, args);
    }

    @Bean
    public IntegrationFlow mainFlow() {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "myErrors.input"))
                .channel(MessageChannels.executor(executor()))
                .handle((p, h) -> {
                    throw new RuntimeException("foo");
                })
                .get();
    }

    @Bean
    public IntegrationFlow myErrors() {
        return f -> f.handle((p, h) -> {
            System.out.println("in my error flow");
            return p;
        })
        .handle(System.out::println);
    }

    @Bean
    public TaskExecutor executor() {
        return new ThreadPoolTaskExecutor();
    }

}

in my error flow
ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: ...