在 Spring 集成中为 ContentEnricher 定义异常回退行为

Defining fallback behavior on exception for a ContentEnricher in Spring Integration

我已经通过 Spring 集成定义了一个很好的流程,在标称情况下,它按照我想要的方式工作。

但是我还没有找到一种方法来定义处理错误的行为(即将输​​入行标记为失败)

这是我得到的错误:

 [ask-scheduler-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has exited due to an exception while sending the request message:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [outboundGateway]; nested exception is...

这是我的流配置(简化):

@Configuration
@EnableIntegration
public class IntegrationConfig {


    @Autowired
    private DataSource dataSource;

    @Bean
    public IntegrationFlow mainFlow() {
        //noinspection unchecked
        return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(5000)
                        .transactional(transactionManager())))
                .split()
                .enrich(e -> e
                        .requestChannel(subChannel())
                        .requestPayload(Message::getPayload)
                        .propertyExpression("fooId", "payload.id"))
                .handle(barHandler())
                .get();
    }

    @Bean
    public IntegrationFlow enricherFlow() {
        return IntegrationFlows.from(subChannel())
                .handle(outboundGateway())
                .get();
    }

    @Bean
    public MessageChannel subChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageSource<Object> jdbcMessageSource() {
        return new JdbcPollingChannelAdapter(this.dataSource,
                "select * from FOO");
    }

    @Bean
    public JdbcOutboundGateway outboundGateway() {
        ...
    }

    @Bean
    public JdbcMessageHandler barHandler() {
        return new JdbcMessageHandler(dataSource,
                "INSERT INTO BAR ...");
    }


    @Bean
    public PlatformTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

我原以为我可以在默认的 errorChannel 上获取错误并在那里执行必要的任务,但我还没有找到完成这项工作的方法。

非常感谢任何想法和帮助!

你知道我们似乎错过了在 Java DSL 中为 EnricherSpec 公开 errorChannel - 请随时就此事提出 JIRA issue。但无论如何,看起来我们可以访问那个 属性:

.enrich(e -> {
            ContentEnricher contentEnricher =
                    e.requestChannel(subChannel())
                            .requestPayload(Message::getPayload)
                            .propertyExpression("fooId", "payload.id"))
                            .get().getT2();
            contentEnricher.setErrorChannel(enricherErrorChannel());
        })

现在您可以将任何 .handle() 流添加到 enricherErrorChannel() 并在钓鱼时处理 ErrorMessage。例如,通常 ErrorMessage 包含一个 payload 作为 MessagingException,而我们有一个 failedMessage 属性。正是那个包含了关于你原来的 payloadheaders 的信息:

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/configuration.html#namespace-errorhandler