JOOQ 和 Spring 批量交易管理

JOOQ and Spring Batch transaction management

我目前正在开发 Spring Batch (2.6.3) 应用程序,我正在使用 JOOQ (3.14.15) 访问 MariaDB 数据库 (10.5.8)。 到目前为止,我从事的工作很少,并且到了测试交易的地步。 我正在尝试 运行 tasklet,据我所知,默认情况下在事务中执行。我用简单的插入制作了存储库,并用 @Transactional 进行了注释。我已经添加了 Thread.sleep 并且我的“测试用例”是我让 Spring 批量执行我的测试 tasklet 并且在睡眠期间我杀死了我的应用程序。我预计 T_FOO table 中不会有记录,因为我预计交易会回滚,但我的 T_FOO table 中有记录,即使在 BATCH_STEP_EXECUTION 我的测试步骤(应用程序关闭后)的状态为 STARTEDEND_TIME 设置为 null - 我猜这是在执行步骤时应用程序关闭时预期的表示。

我猜我没有为 Spring Batch 和 JOOQ 或类似的东西设置正确的事务管理?但当然也有可能猜错

有人可以帮我解决这个问题吗?我正在尝试 google 如何使用 Spring Batch 正确设置 JOOQ,但我能找到的唯一资源是这个 one 我写了受那篇文章启发的 jooq 配置,我也试过复制在文章中粘贴配置,但它没有解决我的问题。

提前感谢您的宝贵时间。


Tasklet 测试步骤

public Step testStep() {
        return this.stepBuilderFactory.get(TEST_STEP)
                .tasklet((stepContribution, chunkContext) -> {
                    final Integer batchId = JobUtils.getBatchId(chunkContext);
                    final LocalDateTime batchDate = JobUtils.getBatchDate(chunkContext);

                    importRepository.importBatch(batchId, batchDate);

                    /* Thread.sleep is here only for testing purposes - at this sleep I am shuting down application */
                    log.debug("Waiting start");
                    Thread.sleep(30000);
                    log.debug("Waiting end");

                    return RepeatStatus.FINISHED;
                }).listener(loadingProcessingErrorListener)
                .build();
    }

Transactional(propagation = Propagation.REQUIRED)
public void importBatch(@NonNull Integer batchId, @NonNull LocalDateTime batchDate) {
    /* simple insert select statement */
    context.insertInto(T_FOO)
        select(context.select(asterisk()).from(T_BAR))
        execute();
}

批量配置

@Configuration
@RequiredArgsConstructor
public class BatchConfiguration extends DefaultBatchConfigurer {

    private final JobRegistry jobRegistry;
    private final DataSource dataSource;

    @Override
    @NonNull
    @SneakyThrows
    public JobLauncher getJobLauncher() {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Override
    @NonNull
    public PlatformTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

JOOQ配置

@Configuration
@RequiredArgsConstructor
public class JooqConfiguration {

    private final DataSource dataSource;

    @Bean
    public DataSourceConnectionProvider connectionProvider() {
        return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
    }

    @Bean
    public DefaultDSLContext context() {
        return new DefaultDSLContext(configuration());
    }

    public DefaultConfiguration configuration() {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();

        jooqConfiguration.set(connectionProvider());

        jooqConfiguration.setDataSource(dataSource);
        jooqConfiguration.setSQLDialect(SQLDialect.MARIADB);
        jooqConfiguration.setSettings(new Settings().withExecuteWithOptimisticLocking(true));

        return jooqConfiguration;
    }
}

控制台日志

2022-05-28 21:26:18.571 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps       : Starting loading import for Batch ID [1]
2022-05-28 21:26:18.607  INFO 15656 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [testStep]
2022-05-28 21:26:18.861 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener            : Executing query          : insert into `t_foo`...
2022-05-28 21:26:18.875 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener            : Affected row(s)          : 1
2022-05-28 21:26:18.876 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps       : Waiting start
2022-05-28 21:26:23.759  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.783  INFO 15656 --- [ionShutdownHook] o.s.s.quartz.SchedulerFactoryBean        : Shutting down Quartz Scheduler
2022-05-28 21:26:23.783  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED shutting down.
2022-05-28 21:26:23.783  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.784  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED shutdown complete.
2022-05-28 21:26:23.790  INFO 15656 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...

Process finished with exit code -1

您正在这样做:

jooqConfiguration.set(connectionProvider());

但是,你也没有必要这样做:

jooqConfiguration.setDataSource(dataSource);

后者覆盖了前者,删除了 TransactionAwareDataSourceProxy 语义,并且也没有其他目的。只需删除该调用即可。