Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时未 persisting/saving 数据到 Postgres 数据库

Spring Batch/Data JPA application not persisting/saving data to Postgres database when calling JPA repository (save, saveAll) methods

我已经无计可施了。到目前为止,我 read/googled 无休止地尝试了所有 google/Whosebug post 有类似问题的解决方案(有很多)。有些看起来很有前途,但还没有对我有用;尽管我已经取得了一些进展并且我相信我走在正确的轨道上(我相信在这一点上它与事务管理器有关并且可能与 Spring Batch 与 Spring Data JPA 发生冲突) .

参考文献:

  1. JpaItemWriter: no transaction is in progress

类似于前面提到的posts,我有一个Spring Boot应用程序正在使用Spring批处理Spring 数据 JPA。它从 .csv 文件中读取逗号分隔的数据,然后使用 JPA 存储库方法对数据库执行一些 processing/transformation 和 尝试 到 persist/save ,特别是这里 .saveAll()(我也尝试了 .save() 方法,这做了同样的事情),因为我正在保存用户定义数据类型(批量插入)的 List<MyUserDefinedDataType>

现在,我的代码在 Spring Boot starter 1.5.9.RELEASE 上运行良好,但我最近尝试升级到 2.X.X,经过无数小时的调试后我发现,只有版本 2.2.0.RELEASE 会将 persist/save 数据存入数据库。所以升级到 >= 2.2.1.RELEASE 会破坏持久性。从 .csv 中读取的一切都很好,就在代码流第一次遇到像 .save() .saveAll() 这样的 JPA 存储库方法时,应用程序保持 运行 但没有任何内容被保留.我还注意到 Hikari 池日志 "active=1 idle=4",但是当我在版本 1.5.9.RELEASE 上查看同一日志时,它在持久化数据后立即显示 active=0 idle=5,因此应用程序肯定挂了。我进入调试器,甚至在跳入存储库调用后看到,它通过 Spring AOP 库等(所有第三方)进入几乎无限循环,我不相信会回到真正的application/business 我写的逻辑。

3c22fb53ed64 2021-05-20 23:53:43.909 DEBUG
                    [HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=5, active=1, idle=4, waiting=0)

无论如何,我尝试了对其他人有效的最常见解决方案是:

  1. 定义 JpaTransactionManager @Bean 并将其注入 Step 函数,同时使用 PlatformTransactionManager 保留 JobRepository。这 没有 工作。然后我也尝试在 JobRepository @Bean 中使用 JpaTransactionManager,这也 not 工作。
  2. 在我的应用程序中定义一个 @RestController 端点以手动触发此作业,而不是从我的主 Application.java class 手动执行。 (我在下面更多地讨论这个)。根据我在上面 post 编辑的 post 之一,即使在 spring >= 2.2.1 上,数据也正确地保存到数据库中,我进一步怀疑 Spring批次persistence/entity/transaction经理搞砸了。

代码基本上是这样的: BatchConfiguration.java

@Configuration
@EnableBatchProcessing
@Import({DatabaseConfiguration.class})
public class BatchConfiguration {

// Datasource is a Postgres DB defined in separate IntelliJ project that I add to my pom.xml
DataSource dataSource;

@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
    this.dataSource = dataSource;
}

@Bean
@Primary
public JpaTransactionManager jpaTransactionManager() {
    final JpaTransactionManager tm = new JpaTransactionManager();
    tm.setDataSource(dataSource);
    return tm;
}


 @Bean
 public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource);
    jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    jobRepositoryFactoryBean.setDatabaseType("POSTGRES");
    return jobRepositoryFactoryBean.getObject();
}

@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    return simpleJobLauncher;
}

@Bean(name = "jobToLoadTheData")
 public Job jobToLoadTheData() {
    return jobBuilderFactory.get("jobToLoadTheData")
            .start(stepToLoadData())
            .listener(new CustomJobListener())
            .build();
}

@Bean
@StepScope
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(maxThreads);
    threadPoolTaskExecutor.setThreadGroupName("taskExecutor-batch");
    return threadPoolTaskExecutor;
}

@Bean(name = "stepToLoadData")
public Step stepToLoadData() {
    TaskletStep step = stepBuilderFactory.get("stepToLoadData")
            .transactionManager(jpaTransactionManager())
            .<List<FieldSet>, List<myCustomPayloadRecord>>chunk(chunkSize)
            .reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
            .processor(myCustomPayloadRecordItemProcessor())
            .writer(myCustomerWriter())
            .faultTolerant()
            .skipPolicy(new AlwaysSkipItemSkipPolicy())
            .skip(DataValidationException.class)
            .listener(new CustomReaderListener())
            .listener(new CustomProcessListener())
            .listener(new CustomWriteListener())
            .listener(new CustomSkipListener())
            .taskExecutor(taskExecutor())
            .throttleLimit(maxThreads)
            .build();
    step.registerStepExecutionListener(stepExecutionListener());
    step.registerChunkListener(new CustomChunkListener());
    return step;
}

我的主要方法: Application.java

  @Autowired
    @Qualifier("jobToLoadTheData")
    private Job loadTheData;

    @Autowired
    private JobLauncher jobLauncher;

    @PostConstruct
    public void launchJob () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException
    {
        JobParameters parameters = (new JobParametersBuilder()).addDate("random", new Date()).toJobParameters();
        jobLauncher.run(loadTheData, parameters);
    }

 public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
}

现在,通常我从 Amazon S3 存储桶中读取这个 .csv ,但由于我在本地测试,我只是将 .csv 放在项目目录中并通过触发作业直接读取它在 Application.java main class 中(如上所示)。此外,我确实在 BatchConfiguration class 中定义了一些其他 bean,但我不想让这个 post 比现在更复杂,而且我已经完成了谷歌搜索,问题可能出在我 posted(希望如此)的方法上。

此外,我想指出,与 Google/Whosebug 上的其他 post 之一类似,用户有类似的问题,我创建了一个 @RestController 端点只需将 .run() 方法调用 JobLauncher 并传入 JobToLoadTheData Bean,它就会触发批量插入。你猜怎么着? 数据保存到数据库就好了即使在 spring >= 2.2.1.

这是怎么回事?这是线索吗?某种类型的实体或交易管理器是否出了什么奇怪的问题?我会接受任何建议!我可以提供你们可能需要的任何更多信息,所以请问。

您正在定义一个 JobRepository 类型的 bean 并期望它被 Spring Batch 拾取。这是不正确的。您需要提供 BatchConfigurer 并覆盖 getJobRepository。这在 reference documentation:

中有解释
You can customize any of these beans by creating a custom implementation of the
BatchConfigurer interface. Typically, extending the DefaultBatchConfigurer
(which is provided if a BatchConfigurer is not found) and overriding the required
getter is sufficient.

这也记录在 @EnableBatchProcessingJavadoc 中。因此,在您的情况下,您需要定义一个类型为 Batchconfigurer 的 bean 并覆盖 getJobRepositorygetTransactionManager,例如:

@Bean
public BatchConfigurer batchConfigurer(EntityManagerFactory entityManagerFactory, DataSource dataSource) {
    return new DefaultBatchConfigurer(dataSource) {
        @Override
        public PlatformTransactionManager getTransactionManager() {
            return new JpaTransactionManager(entityManagerFactory);
        }

        @Override
        public JobRepository getJobRepository() {
            JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
            jobRepositoryFactoryBean.setDataSource(dataSource);
            jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
            // set other properties
            return jobRepositoryFactoryBean.getObject();
        }
    };
}

在 Spring 引导上下文中,如果需要,您还可以覆盖 org.springframework.boot.autoconfigure.batch.JpaBatchConfigurercreateTransactionManagercreateJobRepository 方法。