Spring 集成 - JdbcPollingChannelAdapter 在处理异常时提交而不是回滚
Spring Integration - JdbcPollingChannelAdapter commit instead of rollback on handled Exceptions
我正在使用 Spring 4.1.x APIs
、Spring Integration 4.1.x APIs
和 Spring Integration Java DSL 1.0.x APIs
作为 EIP 流,我们使用 JdbcPollingChannelAdpater
作为使用来自 Oracle 数据库 table 的消息流程的入口点。
尽管我们在 JdbcPollingChannelAdapter
的 Poller
上配置了 ErrorHandler
,但我们看到会话的 Transaction
仍然回滚并且未提交RuntimeException
被 ErrorHandler
.
抛出并正确处理
通读此线程后:,我感觉无法阻止回滚而是强制提交。这个对吗?而且,如果有办法,在安全处理错误时强制提交而不是回滚的最干净的方法是什么?
当前配置:
IntegrationConfig.java:
@Bean
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
dataSource,
"select * from SERVICE_TABLE where rownum <= 10 for update skip locked");
adapter.setUpdateSql("delete from SERVICE_TABLE where SERVICE_MESSAGE_ID in (:id)");
adapter.setRowMapper(serviceMessageRowMapper);
adapter.setMaxRowsPerPoll(1);
adapter.setUpdatePerRow(true);
return adapter;
}
@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(jdbcMessageSource(),
c -> {
c.poller(Pollers.fixedRate(100)
.maxMessagesPerPoll(10)
.transactional(transactionManager)
.errorHandler(errorHandler));
})
.channel(inProcessCh()).get();
}
ErrorHandler.java
@Component
public class ErrorHandler implements org.springframework.util.ErrorHandler {
@Autowired
private PlatformTransactionManager transactionManager;
private static final Logger logger = LogManager.getLogger();
@Override
public void handleError(Throwable t) {
logger.trace("handling error:{}", t.getMessage(), t);
// handle error code here...
// we want to force commit the transaction here?
TransactionStatus txStatus = transactionManager.getTransaction(null);
transactionManager.commit(txStatus);
}
}
--- 编辑以包含 ExpressionEvaluatingRequestHandlerAdvice Bean ---
@Bean
public Advice expressionEvaluatingRequestHandlerAdvice() {
ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
expressionEvaluatingRequestHandlerAdvice.setOnSuccessExpression("payload");
expressionEvaluatingRequestHandlerAdvice
.setOnFailureExpression("payload");
expressionEvaluatingRequestHandlerAdvice.setFailureChannel(errorCh());
return expressionEvaluatingRequestHandlerAdvice;
}
--- 已编辑以显示虚拟测试消息处理程序 ---
.handle(Message.class,
(m, h) -> {
boolean forceTestError = m.getHeaders().get("forceTestError");
if (forceTestError) {
logger.trace("simulated forced TestException");
TestException testException = new TestException(
"forced test exception");
throw testException;
}
logger.trace("simulated successful process");
return m;
}, e-> e.advice(expressionEvaluatingRequestHandlerAdvice())
--- 编辑显示 ExecutorChannelInterceptor 方法 ---
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(jdbcMessageSource(), c -> {
c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(10)
.transactional(transactionManager));
})
.enrichHeaders(h -> h.header("errorChannel", errorCh(), true))
.channel(
MessageChannels.executor("testSyncTaskExecutor",
syncTaskExecutor()).interceptor(
testExecutorChannelInterceptor()))
.handle(Message.class, (m, h) -> {
boolean forceTestError = m.getHeaders().get("forceTestError");
if (forceTestError) {
logger.trace("simulated forced TestException");
TestException testException = new TestException(
"forced test exception");
throw testException;
}
logger.trace("simulated successful process");
}).channel("nullChannel").get();
}
它不会工作只是因为你的 ErrorHandler
在 TX 完成后已经工作了。
这是几行源代码 (AbstractPollingEndpoint.Poller
):
@Override
public void run() {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
.............
try {
if (!pollingTask.call()) {
break;
}
count++;
}
catch (Exception e) {
....
}
}
}
});
}
其中:
ErrorHandler
默认应用于taskExecutor
(SyncTaskExecutor
)。
TransactionInterceptor
作为Aspect
在pollingTask
周围申请Proxy。
因此 TX 在 pollingTask.call()
附近完成并发出。只有在那之后你的 ErrorHandler
才开始在 taskExecutor.execute()
.
中工作
要解决您的问题,您需要找出哪个下游流量部分对 TX 回滚不是那么重要,并在那里设置一些 try...catch
或使用 ExpressionEvaluatingRequestHandlerAdvice
到 "burke" RuntimeException
.
但是正如您从我的推理中注意到的那样,必须在 TX 内完成。
我正在使用 Spring 4.1.x APIs
、Spring Integration 4.1.x APIs
和 Spring Integration Java DSL 1.0.x APIs
作为 EIP 流,我们使用 JdbcPollingChannelAdpater
作为使用来自 Oracle 数据库 table 的消息流程的入口点。
尽管我们在 JdbcPollingChannelAdapter
的 Poller
上配置了 ErrorHandler
,但我们看到会话的 Transaction
仍然回滚并且未提交RuntimeException
被 ErrorHandler
.
通读此线程后:
当前配置:
IntegrationConfig.java:
@Bean
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
dataSource,
"select * from SERVICE_TABLE where rownum <= 10 for update skip locked");
adapter.setUpdateSql("delete from SERVICE_TABLE where SERVICE_MESSAGE_ID in (:id)");
adapter.setRowMapper(serviceMessageRowMapper);
adapter.setMaxRowsPerPoll(1);
adapter.setUpdatePerRow(true);
return adapter;
}
@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(jdbcMessageSource(),
c -> {
c.poller(Pollers.fixedRate(100)
.maxMessagesPerPoll(10)
.transactional(transactionManager)
.errorHandler(errorHandler));
})
.channel(inProcessCh()).get();
}
ErrorHandler.java
@Component
public class ErrorHandler implements org.springframework.util.ErrorHandler {
@Autowired
private PlatformTransactionManager transactionManager;
private static final Logger logger = LogManager.getLogger();
@Override
public void handleError(Throwable t) {
logger.trace("handling error:{}", t.getMessage(), t);
// handle error code here...
// we want to force commit the transaction here?
TransactionStatus txStatus = transactionManager.getTransaction(null);
transactionManager.commit(txStatus);
}
}
--- 编辑以包含 ExpressionEvaluatingRequestHandlerAdvice Bean ---
@Bean
public Advice expressionEvaluatingRequestHandlerAdvice() {
ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
expressionEvaluatingRequestHandlerAdvice.setOnSuccessExpression("payload");
expressionEvaluatingRequestHandlerAdvice
.setOnFailureExpression("payload");
expressionEvaluatingRequestHandlerAdvice.setFailureChannel(errorCh());
return expressionEvaluatingRequestHandlerAdvice;
}
--- 已编辑以显示虚拟测试消息处理程序 ---
.handle(Message.class,
(m, h) -> {
boolean forceTestError = m.getHeaders().get("forceTestError");
if (forceTestError) {
logger.trace("simulated forced TestException");
TestException testException = new TestException(
"forced test exception");
throw testException;
}
logger.trace("simulated successful process");
return m;
}, e-> e.advice(expressionEvaluatingRequestHandlerAdvice())
--- 编辑显示 ExecutorChannelInterceptor 方法 ---
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(jdbcMessageSource(), c -> {
c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(10)
.transactional(transactionManager));
})
.enrichHeaders(h -> h.header("errorChannel", errorCh(), true))
.channel(
MessageChannels.executor("testSyncTaskExecutor",
syncTaskExecutor()).interceptor(
testExecutorChannelInterceptor()))
.handle(Message.class, (m, h) -> {
boolean forceTestError = m.getHeaders().get("forceTestError");
if (forceTestError) {
logger.trace("simulated forced TestException");
TestException testException = new TestException(
"forced test exception");
throw testException;
}
logger.trace("simulated successful process");
}).channel("nullChannel").get();
}
它不会工作只是因为你的 ErrorHandler
在 TX 完成后已经工作了。
这是几行源代码 (AbstractPollingEndpoint.Poller
):
@Override
public void run() {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
.............
try {
if (!pollingTask.call()) {
break;
}
count++;
}
catch (Exception e) {
....
}
}
}
});
}
其中:
ErrorHandler
默认应用于taskExecutor
(SyncTaskExecutor
)。TransactionInterceptor
作为Aspect
在pollingTask
周围申请Proxy。
因此 TX 在 pollingTask.call()
附近完成并发出。只有在那之后你的 ErrorHandler
才开始在 taskExecutor.execute()
.
要解决您的问题,您需要找出哪个下游流量部分对 TX 回滚不是那么重要,并在那里设置一些 try...catch
或使用 ExpressionEvaluatingRequestHandlerAdvice
到 "burke" RuntimeException
.
但是正如您从我的推理中注意到的那样,必须在 TX 内完成。