如何限制 JdbcPollingChannelAdapter 的事务边界
How to restrict transaction boundaries for JdbcPollingChannelAdapter
我有一个 JdbcPollingChannelAdapter 定义如下:
@Bean
public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
"SELECT * FROM common_task where due_at <= NOW() and retries < order by due_at ASC FOR UPDATE SKIP LOCKED");
jdbcPollingChannelAdapter.setMaxRowsPerPoll(1);
jdbcPollingChannelAdapter.setUpdateSql("Update common_task set retries = :retries, due_at = due_at + interval '10 minutes' WHERE ID = (:id)");
jdbcPollingChannelAdapter.setUpdatePerRow(true);
jdbcPollingChannelAdapter.setRowMapper(this::mapRow);
jdbcPollingChannelAdapter.setUpdateSqlParameterSourceFactory(this::updateParamSource);
return jdbcPollingChannelAdapter;
}
此的集成流程:
@Bean
public IntegrationFlow pollingFlow(MessageSource<Object> jdbcMessageSource) {
return IntegrationFlows.from(jdbcMessageSource,
c -> c.poller(Pollers.fixedRate(250, TimeUnit.MILLISECONDS)
.maxMessagesPerPoll(1)
.transactional()))
.split()
.channel(taskSourceChannel())
.get();
}
服务激活器定义为
@ServiceActivator(inputChannel = "taskSourceChannel")
public void doSomething(FooTask event) {
//do something but ** not ** within the transaction of the poller.
}
集成流中的轮询器被定义为事务性的。根据我的理解,这将
1.在事务中执行select查询和更新查询。
2.它还会在同一个事务中执行doSomething()方法。
目标:我想做 1 而不是 2。我想做 select 并在事务中更新以确保两者都发生。但是,我不想在同一个事务中执行 doSomething()。如果在 doSomething() 中出现异常,我仍然想保留在轮询期间所做的更新。我怎样才能做到这一点?
这是通过简单的线程转移完成的。因此,您需要的只是离开轮询线程,允许它提交 TX 并在单独的线程中继续处理。
根据您对 .split()
的逻辑,拆分后已经有新的线程处理更好,因此项目甚至会由 doSomething()
并行处理。
只需 ExecutorChannel
即可实现目标。由于您已经拥有 taskSourceChannel()
,只需考虑将其替换为基于某些托管 ThreadPoolTaskExecutor
的 ExecutorChannel
。
及其 Java文档。
简单的Java配置变体是这样的:
@Bean
public MessageChannel taskSourceChannel() {
return new ExecutorChannel(executor());
}
@Bean
public Executor executor() {
return new ThreadPoolTaskExecutor();
}
我有一个 JdbcPollingChannelAdapter 定义如下:
@Bean
public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
"SELECT * FROM common_task where due_at <= NOW() and retries < order by due_at ASC FOR UPDATE SKIP LOCKED");
jdbcPollingChannelAdapter.setMaxRowsPerPoll(1);
jdbcPollingChannelAdapter.setUpdateSql("Update common_task set retries = :retries, due_at = due_at + interval '10 minutes' WHERE ID = (:id)");
jdbcPollingChannelAdapter.setUpdatePerRow(true);
jdbcPollingChannelAdapter.setRowMapper(this::mapRow);
jdbcPollingChannelAdapter.setUpdateSqlParameterSourceFactory(this::updateParamSource);
return jdbcPollingChannelAdapter;
}
此的集成流程:
@Bean
public IntegrationFlow pollingFlow(MessageSource<Object> jdbcMessageSource) {
return IntegrationFlows.from(jdbcMessageSource,
c -> c.poller(Pollers.fixedRate(250, TimeUnit.MILLISECONDS)
.maxMessagesPerPoll(1)
.transactional()))
.split()
.channel(taskSourceChannel())
.get();
}
服务激活器定义为
@ServiceActivator(inputChannel = "taskSourceChannel")
public void doSomething(FooTask event) {
//do something but ** not ** within the transaction of the poller.
}
集成流中的轮询器被定义为事务性的。根据我的理解,这将 1.在事务中执行select查询和更新查询。 2.它还会在同一个事务中执行doSomething()方法。
目标:我想做 1 而不是 2。我想做 select 并在事务中更新以确保两者都发生。但是,我不想在同一个事务中执行 doSomething()。如果在 doSomething() 中出现异常,我仍然想保留在轮询期间所做的更新。我怎样才能做到这一点?
这是通过简单的线程转移完成的。因此,您需要的只是离开轮询线程,允许它提交 TX 并在单独的线程中继续处理。
根据您对 .split()
的逻辑,拆分后已经有新的线程处理更好,因此项目甚至会由 doSomething()
并行处理。
只需 ExecutorChannel
即可实现目标。由于您已经拥有 taskSourceChannel()
,只需考虑将其替换为基于某些托管 ThreadPoolTaskExecutor
的 ExecutorChannel
。
及其 Java文档。
简单的Java配置变体是这样的:
@Bean
public MessageChannel taskSourceChannel() {
return new ExecutorChannel(executor());
}
@Bean
public Executor executor() {
return new ThreadPoolTaskExecutor();
}