JOOQ 和 Spring 批量交易管理
JOOQ and Spring Batch transaction management
我目前正在开发 Spring Batch (2.6.3) 应用程序,我正在使用 JOOQ (3.14.15) 访问 MariaDB 数据库 (10.5.8)。
到目前为止,我从事的工作很少,并且到了测试交易的地步。
我正在尝试 运行 tasklet,据我所知,默认情况下在事务中执行。我用简单的插入制作了存储库,并用 @Transactional
进行了注释。我已经添加了 Thread.sleep
并且我的“测试用例”是我让 Spring 批量执行我的测试 tasklet 并且在睡眠期间我杀死了我的应用程序。我预计 T_FOO
table 中不会有记录,因为我预计交易会回滚,但我的 T_FOO
table 中有记录,即使在 BATCH_STEP_EXECUTION
我的测试步骤(应用程序关闭后)的状态为 STARTED
,END_TIME
设置为 null
- 我猜这是在执行步骤时应用程序关闭时预期的表示。
我猜我没有为 Spring Batch 和 JOOQ 或类似的东西设置正确的事务管理?但当然也有可能猜错
有人可以帮我解决这个问题吗?我正在尝试 google 如何使用 Spring Batch 正确设置 JOOQ,但我能找到的唯一资源是这个 one 我写了受那篇文章启发的 jooq 配置,我也试过复制在文章中粘贴配置,但它没有解决我的问题。
提前感谢您的宝贵时间。
Tasklet 测试步骤
public Step testStep() {
return this.stepBuilderFactory.get(TEST_STEP)
.tasklet((stepContribution, chunkContext) -> {
final Integer batchId = JobUtils.getBatchId(chunkContext);
final LocalDateTime batchDate = JobUtils.getBatchDate(chunkContext);
importRepository.importBatch(batchId, batchDate);
/* Thread.sleep is here only for testing purposes - at this sleep I am shuting down application */
log.debug("Waiting start");
Thread.sleep(30000);
log.debug("Waiting end");
return RepeatStatus.FINISHED;
}).listener(loadingProcessingErrorListener)
.build();
}
Transactional(propagation = Propagation.REQUIRED)
public void importBatch(@NonNull Integer batchId, @NonNull LocalDateTime batchDate) {
/* simple insert select statement */
context.insertInto(T_FOO)
select(context.select(asterisk()).from(T_BAR))
execute();
}
批量配置
@Configuration
@RequiredArgsConstructor
public class BatchConfiguration extends DefaultBatchConfigurer {
private final JobRegistry jobRegistry;
private final DataSource dataSource;
@Override
@NonNull
@SneakyThrows
public JobLauncher getJobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Override
@NonNull
public PlatformTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}
JOOQ配置
@Configuration
@RequiredArgsConstructor
public class JooqConfiguration {
private final DataSource dataSource;
@Bean
public DataSourceConnectionProvider connectionProvider() {
return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
}
@Bean
public DefaultDSLContext context() {
return new DefaultDSLContext(configuration());
}
public DefaultConfiguration configuration() {
DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
jooqConfiguration.set(connectionProvider());
jooqConfiguration.setDataSource(dataSource);
jooqConfiguration.setSQLDialect(SQLDialect.MARIADB);
jooqConfiguration.setSettings(new Settings().withExecuteWithOptimisticLocking(true));
return jooqConfiguration;
}
}
控制台日志
2022-05-28 21:26:18.571 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps : Starting loading import for Batch ID [1]
2022-05-28 21:26:18.607 INFO 15656 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [testStep]
2022-05-28 21:26:18.861 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener : Executing query : insert into `t_foo`...
2022-05-28 21:26:18.875 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener : Affected row(s) : 1
2022-05-28 21:26:18.876 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps : Waiting start
2022-05-28 21:26:23.759 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.783 INFO 15656 --- [ionShutdownHook] o.s.s.quartz.SchedulerFactoryBean : Shutting down Quartz Scheduler
2022-05-28 21:26:23.783 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED shutting down.
2022-05-28 21:26:23.783 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.784 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED shutdown complete.
2022-05-28 21:26:23.790 INFO 15656 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
Process finished with exit code -1
您正在这样做:
jooqConfiguration.set(connectionProvider());
但是,你也没有必要这样做:
jooqConfiguration.setDataSource(dataSource);
后者覆盖了前者,删除了 TransactionAwareDataSourceProxy
语义,并且也没有其他目的。只需删除该调用即可。
我目前正在开发 Spring Batch (2.6.3) 应用程序,我正在使用 JOOQ (3.14.15) 访问 MariaDB 数据库 (10.5.8)。
到目前为止,我从事的工作很少,并且到了测试交易的地步。
我正在尝试 运行 tasklet,据我所知,默认情况下在事务中执行。我用简单的插入制作了存储库,并用 @Transactional
进行了注释。我已经添加了 Thread.sleep
并且我的“测试用例”是我让 Spring 批量执行我的测试 tasklet 并且在睡眠期间我杀死了我的应用程序。我预计 T_FOO
table 中不会有记录,因为我预计交易会回滚,但我的 T_FOO
table 中有记录,即使在 BATCH_STEP_EXECUTION
我的测试步骤(应用程序关闭后)的状态为 STARTED
,END_TIME
设置为 null
- 我猜这是在执行步骤时应用程序关闭时预期的表示。
我猜我没有为 Spring Batch 和 JOOQ 或类似的东西设置正确的事务管理?但当然也有可能猜错
有人可以帮我解决这个问题吗?我正在尝试 google 如何使用 Spring Batch 正确设置 JOOQ,但我能找到的唯一资源是这个 one 我写了受那篇文章启发的 jooq 配置,我也试过复制在文章中粘贴配置,但它没有解决我的问题。
提前感谢您的宝贵时间。
Tasklet 测试步骤
public Step testStep() {
return this.stepBuilderFactory.get(TEST_STEP)
.tasklet((stepContribution, chunkContext) -> {
final Integer batchId = JobUtils.getBatchId(chunkContext);
final LocalDateTime batchDate = JobUtils.getBatchDate(chunkContext);
importRepository.importBatch(batchId, batchDate);
/* Thread.sleep is here only for testing purposes - at this sleep I am shuting down application */
log.debug("Waiting start");
Thread.sleep(30000);
log.debug("Waiting end");
return RepeatStatus.FINISHED;
}).listener(loadingProcessingErrorListener)
.build();
}
Transactional(propagation = Propagation.REQUIRED)
public void importBatch(@NonNull Integer batchId, @NonNull LocalDateTime batchDate) {
/* simple insert select statement */
context.insertInto(T_FOO)
select(context.select(asterisk()).from(T_BAR))
execute();
}
批量配置
@Configuration
@RequiredArgsConstructor
public class BatchConfiguration extends DefaultBatchConfigurer {
private final JobRegistry jobRegistry;
private final DataSource dataSource;
@Override
@NonNull
@SneakyThrows
public JobLauncher getJobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Override
@NonNull
public PlatformTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}
JOOQ配置
@Configuration
@RequiredArgsConstructor
public class JooqConfiguration {
private final DataSource dataSource;
@Bean
public DataSourceConnectionProvider connectionProvider() {
return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
}
@Bean
public DefaultDSLContext context() {
return new DefaultDSLContext(configuration());
}
public DefaultConfiguration configuration() {
DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
jooqConfiguration.set(connectionProvider());
jooqConfiguration.setDataSource(dataSource);
jooqConfiguration.setSQLDialect(SQLDialect.MARIADB);
jooqConfiguration.setSettings(new Settings().withExecuteWithOptimisticLocking(true));
return jooqConfiguration;
}
}
控制台日志
2022-05-28 21:26:18.571 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps : Starting loading import for Batch ID [1]
2022-05-28 21:26:18.607 INFO 15656 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [testStep]
2022-05-28 21:26:18.861 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener : Executing query : insert into `t_foo`...
2022-05-28 21:26:18.875 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener : Affected row(s) : 1
2022-05-28 21:26:18.876 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps : Waiting start
2022-05-28 21:26:23.759 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.783 INFO 15656 --- [ionShutdownHook] o.s.s.quartz.SchedulerFactoryBean : Shutting down Quartz Scheduler
2022-05-28 21:26:23.783 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED shutting down.
2022-05-28 21:26:23.783 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.784 INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler : Scheduler quartzScheduler_$_NON_CLUSTERED shutdown complete.
2022-05-28 21:26:23.790 INFO 15656 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
Process finished with exit code -1
您正在这样做:
jooqConfiguration.set(connectionProvider());
但是,你也没有必要这样做:
jooqConfiguration.setDataSource(dataSource);
后者覆盖了前者,删除了 TransactionAwareDataSourceProxy
语义,并且也没有其他目的。只需删除该调用即可。