Spring 集成 - JdbcPollingChannelAdapter 在处理异常时提交而不是回滚

Spring Integration - JdbcPollingChannelAdapter commit instead of rollback on handled Exceptions

我正在使用 Spring 4.1.x APIsSpring Integration 4.1.x APIsSpring Integration Java DSL 1.0.x APIs 作为 EIP 流,我们使用 JdbcPollingChannelAdpater 作为使用来自 Oracle 数据库 table 的消息流程的入口点。

尽管我们在 JdbcPollingChannelAdapterPoller 上配置了 ErrorHandler,但我们看到会话的 Transaction 仍然回滚并且未提交RuntimeExceptionErrorHandler.

抛出并正确处理

通读此线程后:,我感觉无法阻止回滚而是强制提交。这个对吗?而且,如果有办法,在安全处理错误时强制提交而不是回滚的最干净的方法是什么?

当前配置:

IntegrationConfig.java:

@Bean
public MessageSource<Object> jdbcMessageSource() {

    JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
            dataSource,
            "select * from SERVICE_TABLE where rownum <= 10 for update skip locked");
    adapter.setUpdateSql("delete from SERVICE_TABLE where SERVICE_MESSAGE_ID in (:id)");
    adapter.setRowMapper(serviceMessageRowMapper);
    adapter.setMaxRowsPerPoll(1);
    adapter.setUpdatePerRow(true);
    return adapter;
}

@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow inFlow() {

    return IntegrationFlows
            .from(jdbcMessageSource(),
                    c -> {
                        c.poller(Pollers.fixedRate(100)
                                .maxMessagesPerPoll(10)
                                .transactional(transactionManager)
                                .errorHandler(errorHandler));
                    })
                .channel(inProcessCh()).get();
}

ErrorHandler.java

@Component
public class ErrorHandler implements org.springframework.util.ErrorHandler {

    @Autowired
    private PlatformTransactionManager transactionManager;

    private static final Logger logger = LogManager.getLogger();

    @Override
    public void handleError(Throwable t) {

        logger.trace("handling error:{}", t.getMessage(), t);

        // handle error code here...

        // we want to force commit the transaction here?
        TransactionStatus txStatus = transactionManager.getTransaction(null);
        transactionManager.commit(txStatus);
    }
}

--- 编辑以包含 ExpressionEvaluatingRequestHandlerAdvice Bean ---

@Bean
public Advice expressionEvaluatingRequestHandlerAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
    expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
    expressionEvaluatingRequestHandlerAdvice.setOnSuccessExpression("payload");
    expressionEvaluatingRequestHandlerAdvice
            .setOnFailureExpression("payload");
    expressionEvaluatingRequestHandlerAdvice.setFailureChannel(errorCh());
    return expressionEvaluatingRequestHandlerAdvice;
}

--- 已编辑以显示虚拟测试消息处理程序 ---

    .handle(Message.class,
                    (m, h) -> {

                        boolean forceTestError = m.getHeaders().get("forceTestError");
                        if (forceTestError) {
                            logger.trace("simulated forced TestException");
                            TestException testException = new TestException(
                                    "forced test exception");
                            throw testException;
                        }

                        logger.trace("simulated successful process");

                        return m;
                    }, e-> e.advice(expressionEvaluatingRequestHandlerAdvice())

--- 编辑显示 ExecutorChannelInterceptor 方法 ---

@Bean
public IntegrationFlow inFlow() {

    return IntegrationFlows
            .from(jdbcMessageSource(), c -> {
                c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(10)
                        .transactional(transactionManager));
            })
            .enrichHeaders(h -> h.header("errorChannel", errorCh(), true))
            .channel(
                    MessageChannels.executor("testSyncTaskExecutor",
                            syncTaskExecutor()).interceptor(
                            testExecutorChannelInterceptor()))
            .handle(Message.class, (m, h) -> {
                    boolean forceTestError = m.getHeaders().get("forceTestError");
                    if (forceTestError) {
                        logger.trace("simulated forced TestException");
                        TestException testException = new TestException(
                                "forced test exception");
                        throw testException;
                    }

                    logger.trace("simulated successful process");
            }).channel("nullChannel").get();
}

它不会工作只是因为你的 ErrorHandler 在 TX 完成后已经工作了。

这是几行源代码 (AbstractPollingEndpoint.Poller):

@Override
public void run() {
    taskExecutor.execute(new Runnable() {
        @Override
        public void run() {

           .............
                try {
                        if (!pollingTask.call()) {
                            break;
                        }
                        count++;
                    }
                    catch (Exception e) {
            ....
                    }
                }
            }
        });
    }

其中:

  1. ErrorHandler默认应用于taskExecutorSyncTaskExecutor)。

  2. TransactionInterceptor作为AspectpollingTask周围申请Proxy。

因此 TX 在 pollingTask.call() 附近完成并发出。只有在那之后你的 ErrorHandler 才开始在 taskExecutor.execute().

中工作

要解决您的问题,您需要找出哪个下游流量部分对 TX 回滚不是那么重要,并在那里设置一些 try...catch 或使用 ExpressionEvaluatingRequestHandlerAdvice 到 "burke" RuntimeException.

但是正如您从我的推理中注意到的那样,必须在 TX 内完成。