Spring 使用 SimpleAsyncTaskExecutor 的批处理未保存到数据库

Spring Batch with SimpleAsyncTaskExecutor is not saving to DB

我有一个简单的 Spring 引导应用程序,它有一个 enpoint,它通过在 JobLauncher bean 中配置的 SimpleAsyncTaskExecutor 异步调用 Spring 批处理作业。

Spring 批处理作业异步启动并工作正常,但没有任何内容保存到数据库中。

如果我删除 SimpleAsyncTaskExecutor,数据将被保存。

这是我的 BatchConfigurer。我在这里使用 SimpleAsyncTaskExecutor 配置 JobLauncher。如果我删除行 simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1) 数据被保存。

@Component
public class CustomBatchConfiguration implements BatchConfigurer {

private static final Log LOGGER = LogFactory.getLog(CustomBatchConfiguration.class);

@Autowired
private BatchProperties properties;

@Autowired
private DataSource dataSource;

@Autowired
private EntityManagerFactory entityManagerFactory;

private PlatformTransactionManager transactionManager;

private JobRepository jobRepository;

private JobLauncher jobLauncher;

private JobExplorer jobExplorer;

/**
 * Registers {@link JobRepository} bean.
 */
@Override
public JobRepository getJobRepository() {
    return this.jobRepository;
}

/**
 * Registers {@link PlatformTransactionManager} bean.
 */
@Override
public PlatformTransactionManager getTransactionManager() {
    return this.transactionManager;
}

/**
 * Registers {@link JobLauncher} bean.
 */
@Override
public JobLauncher getJobLauncher() {
    return this.jobLauncher;
}

/**
 * Registers {@link JobExplorer} bean. This bean is actually created in
 * {@link BatchConfig}.
 */
@Override
public JobExplorer getJobExplorer() throws Exception {
    return this.jobExplorer;
}

/**
 * Initializes Spring Batch components.
 */
@PostConstruct
public void initialize() {
    try {
        this.transactionManager = createTransactionManager();
        this.jobRepository = createJobRepository();
        this.jobLauncher = createJobLauncher();
        this.jobExplorer = createJobExplorer();
    } catch (Exception ex) {
        throw new IllegalStateException("Unable to initialize Spring Batch", ex);
    }
}

private JobExplorer createJobExplorer() throws Exception {
    JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
    jobExplorerFactoryBean.setDataSource(this.dataSource);
    String tablePrefix = this.properties.getTablePrefix();

    if (StringUtils.hasText(tablePrefix)) {
        jobExplorerFactoryBean.setTablePrefix(tablePrefix);
    }

    jobExplorerFactoryBean.afterPropertiesSet();
    return jobExplorerFactoryBean.getObject();
}

private JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();

    simpleJobLauncher.setJobRepository(getJobRepository());
    simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1)
    simpleJobLauncher.afterPropertiesSet();

    return simpleJobLauncher;
}

private JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();

    jobRepositoryFactoryBean.setDatabaseType("db2");
    jobRepositoryFactoryBean.setDataSource(this.dataSource);

    if (this.entityManagerFactory != null) {
        LOGGER.warn("JPA does not support custom isolation levels, so locks may not be taken when launching Jobs");
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
    }

    String tablePrefix = this.properties.getTablePrefix();
    if (StringUtils.hasText(tablePrefix)) {
        jobRepositoryFactoryBean.setTablePrefix(tablePrefix);
    }

    jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
    jobRepositoryFactoryBean.afterPropertiesSet();

    return jobRepositoryFactoryBean.getObject();
}

private PlatformTransactionManager createTransactionManager() {
    return new DataSourceTransactionManager(dataSource);
}

}

这是我的 BatchConfiguration。

@Autowired(required = true)
private MyItemReader myItemReader;

@Autowired(required = true)
private MyItemProcessor myItemProcessor;

@Autowired(required = true)
private MyItemWriter myItemWriter;


@Bean
public Step myStep(TaskExecutor taskExecutor) { 

    return stepBuilderFactory.get("myStepName")
            .<SomeWrapper, SomeWrapper>chunk(
                    1)
            .reader(myItemReader)
            .processor(myItemProcessor).writer(myItemWriter)

            .build();
}

@Bean(name = "myJob")
public Job myJob(Step myStep) {
    return jobBuilderFactory.get("myJobName").incrementer(new RunIdIncrementer())
            .flow(myStep).end().build();
}

我错过了什么?

提前致谢

我做了以下事情来解决我的问题:

  1. CustomBatchConfiguration我改了下面

    private PlatformTransactionManager createTransactionManager() {        
        return new DataSourceTransactionManager(dataSource);
    }
    

private PlatformTransactionManager createTransactionManager() {
    if (this.entityManagerFactory != null) {
        return new JpaTransactionManager(this.entityManagerFactory);
    }

    return new DataSourceTransactionManager(this.dataSource);
}

我使用了 spring JpaTransactionManager。

  1. 我将 JobLauncher TaskExecutor 从

    simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    

ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
threadPoolExecutor.setMaxPoolSize(springBatchJobThreadPollMaxSize);
threadPoolExecutor.afterPropertiesSet();

TaskExecutor taskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(threadPoolExecutor);

创建更强大的线程池并在它们之间共享 spring 安全上下文。

  1. 我使用了 Tasklet 而不是 Spring 面向批处理块的解决方案。 详情。