PublishSubscribeChannel 中的错误处理

Error handling in PublishSubscribeChannel

我正在使用 Spring 使用 DSL 的集成来处理 JMS 和 REST 服务之间的通信。要求是消息应该无限期地重新传递。在一种情况下,我必须顺序执行两个操作。如果第一个失败,则不应执行第二个,但万一它是 4xx 错误,我不应该尝试重新传递它。我的代码如下所示:

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
   .publishSubscribeChannel(c -> c
      .subscribe(firstRestOperation ->                                                  
         firstRestOperation
         .transform(originalMessageToFirstRequestTransformer())
         .handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
                     .httpMethod(HttpMethod.POST).get())  //when this handler receives HTTP Status 4xx, 
                                                          //second operation shouldn't be executed and 
                                                          //and message shouldn't be redelievered
      .subscribe(secondRestOperation->
         secondRestOperation
         .transform(originalMessageToSecondRequestTransformer())
         .handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();

class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B


    @Override
    public void handleError(ClientHttpResponse response) throws IOException {
        if(response.getStatusCode().is4xxClientError()){
            log.warn(...);
        }else{
            super.handleError(response);
        }
    }
}

@Bean
public RestTemplate restTemplate() {
   RestTemplate restTemplate = new RestTemplate();
   restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
   return restTemplate;
}

我怎样才能满足这些要求?我唯一的想法是在提交 JMS 会话时以某种方式中断 IntegrationFlow。

感谢您的任何建议。

更新

选项 A:目前:

选项B:我也可以处理4xx错误,那么:

但这会导致操作 2 被执行

我需要的是:

更新 2

我想我可能有所进展。正如@gary-russel 建议的那样,我添加了错误通道,并处理了 4xx 错误:

    @Bean
    public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
        return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
    }


    @Bean
    public PublishSubscribeChannel errorHandlingChannel() {
        return MessageChannels.publishSubscribe().get();
    }
    
    @Bean
    public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
        ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
        router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
        router.setDefaultOutputChannel(unhandledErrorsChannel());
        return router;
    }


    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(customErrorChannel())
                .log()
                .route(errorMessageExceptionTypeRouter())
                .get();
    }
    

    @Bean
    public MessageChannel clientErrorMessageChannel(){
        return MessageChannels
                .direct()
                .get();
    }


    @Bean
    public IntegrationFlow clientErrorFlow() {
        return IntegrationFlows.from(clientErrorMessageChannel())
                .handle(message -> log.warn(...)    //handle error here
                .get();
    }


        @Bean
    public MessageChannel unhandledErrorsChannel(){
        return MessageChannels.direct().get();
    }
    
        @Bean
    public IntegrationFlow unhandledErrorsFlow(){
        //how should I implement it?
    }

我只想处理 4xx 错误,应传播其余错误并导致 JMS 消息重新传递。我尝试不在 ErrorMessageExceptionTypeRouter 中设置 defaultOutputChannel (比抛出另一个异常)或将 defaultOutputChannel 设置为默认值 errorChannel (比处理所有错误)。

更新 3

在以下位置找到解决方案:

这是我的错误处理流程的代码:

 @Bean
    public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
        return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
    }

    @Bean
    public PublishSubscribeChannel customErrorChannel() {
        return MessageChannels.publishSubscribe().get();
    }

    @Bean
    public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
        ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
        router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
        router.setDefaultOutputChannel(unhandledErrorsChannel());
        return router;
    }

    @Bean
    public MessageChannel clientErrorMessageChannel(){
        return MessageChannels
                .direct()
                .get();
    }

    @Bean
    public MessageChannel unhandledErrorsChannel(){
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow unhandledErrorsFlow(){
        return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
    }

    public void handleError(Throwable t) throws Throwable {
        log.warn("Received unhandled exception");
        throw t;
    }

    @Bean
    public IntegrationFlow clientErrorFlow() {
        return IntegrationFlows.from(clientErrorMessageChannel())
                .handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
                .get();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(customErrorChannel())
                .log()
                .route(errorMessageExceptionTypeRouter())
                .get();
    }

所以解决方案是将异常重定向到一个流程,该流程将通过重新抛出它们来处理它们。太糟糕了 BaseIntegrationFlow 没有接受和抛出的方法 Throwable - 现在只能通过指定要调用的 bean 和方法名称来实现。

这是默认行为;除非 ignoreFailures 属性 是 true(默认情况下是 false),否则不会调用第二个订阅者。

您需要显示上游流,但要“捕获”异常,您需要向(大概)消息驱动的入站适配器添加错误通道并在那里处理异常。