Spring Batch/Data JPA 应用程序在调用 JPA 存储库(save、saveAll)方法时未 persisting/saving 数据到 Postgres 数据库
Spring Batch/Data JPA application not persisting/saving data to Postgres database when calling JPA repository (save, saveAll) methods
我已经无计可施了。到目前为止,我 read/googled 无休止地尝试了所有 google/Whosebug post 有类似问题的解决方案(有很多)。有些看起来很有前途,但还没有对我有用;尽管我已经取得了一些进展并且我相信我走在正确的轨道上(我相信在这一点上它与事务管理器有关并且可能与 Spring Batch 与 Spring Data JPA 发生冲突) .
参考文献:
- JpaItemWriter: no transaction is in progress
类似于前面提到的posts,我有一个Spring Boot应用程序正在使用Spring批处理 和 Spring 数据 JPA。它从 .csv
文件中读取逗号分隔的数据,然后使用 JPA 存储库方法对数据库执行一些 processing/transformation 和 尝试 到 persist/save ,特别是这里 .saveAll()
(我也尝试了 .save()
方法,这做了同样的事情),因为我正在保存用户定义数据类型(批量插入)的 List<MyUserDefinedDataType>
。
现在,我的代码在 Spring Boot starter 1.5.9.RELEASE
上运行良好,但我最近尝试升级到 2.X.X,经过无数小时的调试后我发现,只有版本 2.2.0.RELEASE
会将 persist/save 数据存入数据库。所以升级到 >= 2.2.1.RELEASE
会破坏持久性。从 .csv
中读取的一切都很好,就在代码流第一次遇到像 .save()
.saveAll()
这样的 JPA 存储库方法时,应用程序保持 运行 但没有任何内容被保留.我还注意到 Hikari 池日志 "active=1 idle=4"
,但是当我在版本 1.5.9.RELEASE
上查看同一日志时,它在持久化数据后立即显示 active=0 idle=5
,因此应用程序肯定挂了。我进入调试器,甚至在跳入存储库调用后看到,它通过 Spring AOP 库等(所有第三方)进入几乎无限循环,我不相信会回到真正的application/business 我写的逻辑。
3c22fb53ed64 2021-05-20 23:53:43.909 DEBUG
[HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=5, active=1, idle=4, waiting=0)
无论如何,我尝试了对其他人有效的最常见解决方案是:
- 定义
JpaTransactionManager
@Bean
并将其注入 Step
函数,同时使用 PlatformTransactionManager
保留 JobRepository
。这 没有 工作。然后我也尝试在 JobRepository @Bean
中使用 JpaTransactionManager
,这也 not 工作。
- 在我的应用程序中定义一个
@RestController
端点以手动触发此作业,而不是从我的主 Application.java
class 手动执行。 (我在下面更多地讨论这个)。根据我在上面 post 编辑的 post 之一,即使在 spring >= 2.2.1 上,数据也正确地保存到数据库中,我进一步怀疑 Spring批次persistence/entity/transaction经理搞砸了。
代码基本上是这样的:
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
@Import({DatabaseConfiguration.class})
public class BatchConfiguration {
// Datasource is a Postgres DB defined in separate IntelliJ project that I add to my pom.xml
DataSource dataSource;
@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean
@Primary
public JpaTransactionManager jpaTransactionManager() {
final JpaTransactionManager tm = new JpaTransactionManager();
tm.setDataSource(dataSource);
return tm;
}
@Bean
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType("POSTGRES");
return jobRepositoryFactoryBean.getObject();
}
@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
return simpleJobLauncher;
}
@Bean(name = "jobToLoadTheData")
public Job jobToLoadTheData() {
return jobBuilderFactory.get("jobToLoadTheData")
.start(stepToLoadData())
.listener(new CustomJobListener())
.build();
}
@Bean
@StepScope
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(maxThreads);
threadPoolTaskExecutor.setThreadGroupName("taskExecutor-batch");
return threadPoolTaskExecutor;
}
@Bean(name = "stepToLoadData")
public Step stepToLoadData() {
TaskletStep step = stepBuilderFactory.get("stepToLoadData")
.transactionManager(jpaTransactionManager())
.<List<FieldSet>, List<myCustomPayloadRecord>>chunk(chunkSize)
.reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
.processor(myCustomPayloadRecordItemProcessor())
.writer(myCustomerWriter())
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.skip(DataValidationException.class)
.listener(new CustomReaderListener())
.listener(new CustomProcessListener())
.listener(new CustomWriteListener())
.listener(new CustomSkipListener())
.taskExecutor(taskExecutor())
.throttleLimit(maxThreads)
.build();
step.registerStepExecutionListener(stepExecutionListener());
step.registerChunkListener(new CustomChunkListener());
return step;
}
我的主要方法:
Application.java
@Autowired
@Qualifier("jobToLoadTheData")
private Job loadTheData;
@Autowired
private JobLauncher jobLauncher;
@PostConstruct
public void launchJob () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException
{
JobParameters parameters = (new JobParametersBuilder()).addDate("random", new Date()).toJobParameters();
jobLauncher.run(loadTheData, parameters);
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
现在,通常我从 Amazon S3 存储桶中读取这个 .csv
,但由于我在本地测试,我只是将 .csv 放在项目目录中并通过触发作业直接读取它在 Application.java
main class 中(如上所示)。此外,我确实在 BatchConfiguration
class 中定义了一些其他 bean,但我不想让这个 post 比现在更复杂,而且我已经完成了谷歌搜索,问题可能出在我 posted(希望如此)的方法上。
此外,我想指出,与 Google/Whosebug 上的其他 post 之一类似,用户有类似的问题,我创建了一个 @RestController
端点只需将 .run()
方法调用 JobLauncher
并传入 JobToLoadTheData
Bean,它就会触发批量插入。你猜怎么着? 数据保存到数据库就好了,即使在 spring >= 2.2.1.
这是怎么回事?这是线索吗?某种类型的实体或交易管理器是否出了什么奇怪的问题?我会接受任何建议!我可以提供你们可能需要的任何更多信息,所以请问。
您正在定义一个 JobRepository
类型的 bean 并期望它被 Spring Batch 拾取。这是不正确的。您需要提供 BatchConfigurer
并覆盖 getJobRepository
。这在 reference documentation:
中有解释
You can customize any of these beans by creating a custom implementation of the
BatchConfigurer interface. Typically, extending the DefaultBatchConfigurer
(which is provided if a BatchConfigurer is not found) and overriding the required
getter is sufficient.
这也记录在 @EnableBatchProcessing
的 Javadoc 中。因此,在您的情况下,您需要定义一个类型为 Batchconfigurer
的 bean 并覆盖 getJobRepository
和 getTransactionManager
,例如:
@Bean
public BatchConfigurer batchConfigurer(EntityManagerFactory entityManagerFactory, DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new JpaTransactionManager(entityManagerFactory);
}
@Override
public JobRepository getJobRepository() {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
// set other properties
return jobRepositoryFactoryBean.getObject();
}
};
}
在 Spring 引导上下文中,如果需要,您还可以覆盖 org.springframework.boot.autoconfigure.batch.JpaBatchConfigurer
的 createTransactionManager
和 createJobRepository
方法。
我已经无计可施了。到目前为止,我 read/googled 无休止地尝试了所有 google/Whosebug post 有类似问题的解决方案(有很多)。有些看起来很有前途,但还没有对我有用;尽管我已经取得了一些进展并且我相信我走在正确的轨道上(我相信在这一点上它与事务管理器有关并且可能与 Spring Batch 与 Spring Data JPA 发生冲突) .
参考文献:
- JpaItemWriter: no transaction is in progress
类似于前面提到的posts,我有一个Spring Boot应用程序正在使用Spring批处理 和 Spring 数据 JPA。它从 .csv
文件中读取逗号分隔的数据,然后使用 JPA 存储库方法对数据库执行一些 processing/transformation 和 尝试 到 persist/save ,特别是这里 .saveAll()
(我也尝试了 .save()
方法,这做了同样的事情),因为我正在保存用户定义数据类型(批量插入)的 List<MyUserDefinedDataType>
。
现在,我的代码在 Spring Boot starter 1.5.9.RELEASE
上运行良好,但我最近尝试升级到 2.X.X,经过无数小时的调试后我发现,只有版本 2.2.0.RELEASE
会将 persist/save 数据存入数据库。所以升级到 >= 2.2.1.RELEASE
会破坏持久性。从 .csv
中读取的一切都很好,就在代码流第一次遇到像 .save()
.saveAll()
这样的 JPA 存储库方法时,应用程序保持 运行 但没有任何内容被保留.我还注意到 Hikari 池日志 "active=1 idle=4"
,但是当我在版本 1.5.9.RELEASE
上查看同一日志时,它在持久化数据后立即显示 active=0 idle=5
,因此应用程序肯定挂了。我进入调试器,甚至在跳入存储库调用后看到,它通过 Spring AOP 库等(所有第三方)进入几乎无限循环,我不相信会回到真正的application/business 我写的逻辑。
3c22fb53ed64 2021-05-20 23:53:43.909 DEBUG
[HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=5, active=1, idle=4, waiting=0)
无论如何,我尝试了对其他人有效的最常见解决方案是:
- 定义
JpaTransactionManager
@Bean
并将其注入Step
函数,同时使用PlatformTransactionManager
保留JobRepository
。这 没有 工作。然后我也尝试在 JobRepository@Bean
中使用JpaTransactionManager
,这也 not 工作。 - 在我的应用程序中定义一个
@RestController
端点以手动触发此作业,而不是从我的主Application.java
class 手动执行。 (我在下面更多地讨论这个)。根据我在上面 post 编辑的 post 之一,即使在 spring >= 2.2.1 上,数据也正确地保存到数据库中,我进一步怀疑 Spring批次persistence/entity/transaction经理搞砸了。
代码基本上是这样的: BatchConfiguration.java
@Configuration
@EnableBatchProcessing
@Import({DatabaseConfiguration.class})
public class BatchConfiguration {
// Datasource is a Postgres DB defined in separate IntelliJ project that I add to my pom.xml
DataSource dataSource;
@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
this.dataSource = dataSource;
}
@Bean
@Primary
public JpaTransactionManager jpaTransactionManager() {
final JpaTransactionManager tm = new JpaTransactionManager();
tm.setDataSource(dataSource);
return tm;
}
@Bean
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType("POSTGRES");
return jobRepositoryFactoryBean.getObject();
}
@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
return simpleJobLauncher;
}
@Bean(name = "jobToLoadTheData")
public Job jobToLoadTheData() {
return jobBuilderFactory.get("jobToLoadTheData")
.start(stepToLoadData())
.listener(new CustomJobListener())
.build();
}
@Bean
@StepScope
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(maxThreads);
threadPoolTaskExecutor.setThreadGroupName("taskExecutor-batch");
return threadPoolTaskExecutor;
}
@Bean(name = "stepToLoadData")
public Step stepToLoadData() {
TaskletStep step = stepBuilderFactory.get("stepToLoadData")
.transactionManager(jpaTransactionManager())
.<List<FieldSet>, List<myCustomPayloadRecord>>chunk(chunkSize)
.reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
.processor(myCustomPayloadRecordItemProcessor())
.writer(myCustomerWriter())
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.skip(DataValidationException.class)
.listener(new CustomReaderListener())
.listener(new CustomProcessListener())
.listener(new CustomWriteListener())
.listener(new CustomSkipListener())
.taskExecutor(taskExecutor())
.throttleLimit(maxThreads)
.build();
step.registerStepExecutionListener(stepExecutionListener());
step.registerChunkListener(new CustomChunkListener());
return step;
}
我的主要方法: Application.java
@Autowired
@Qualifier("jobToLoadTheData")
private Job loadTheData;
@Autowired
private JobLauncher jobLauncher;
@PostConstruct
public void launchJob () throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException
{
JobParameters parameters = (new JobParametersBuilder()).addDate("random", new Date()).toJobParameters();
jobLauncher.run(loadTheData, parameters);
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
现在,通常我从 Amazon S3 存储桶中读取这个 .csv
,但由于我在本地测试,我只是将 .csv 放在项目目录中并通过触发作业直接读取它在 Application.java
main class 中(如上所示)。此外,我确实在 BatchConfiguration
class 中定义了一些其他 bean,但我不想让这个 post 比现在更复杂,而且我已经完成了谷歌搜索,问题可能出在我 posted(希望如此)的方法上。
此外,我想指出,与 Google/Whosebug 上的其他 post 之一类似,用户有类似的问题,我创建了一个 @RestController
端点只需将 .run()
方法调用 JobLauncher
并传入 JobToLoadTheData
Bean,它就会触发批量插入。你猜怎么着? 数据保存到数据库就好了,即使在 spring >= 2.2.1.
这是怎么回事?这是线索吗?某种类型的实体或交易管理器是否出了什么奇怪的问题?我会接受任何建议!我可以提供你们可能需要的任何更多信息,所以请问。
您正在定义一个 JobRepository
类型的 bean 并期望它被 Spring Batch 拾取。这是不正确的。您需要提供 BatchConfigurer
并覆盖 getJobRepository
。这在 reference documentation:
You can customize any of these beans by creating a custom implementation of the
BatchConfigurer interface. Typically, extending the DefaultBatchConfigurer
(which is provided if a BatchConfigurer is not found) and overriding the required
getter is sufficient.
这也记录在 @EnableBatchProcessing
的 Javadoc 中。因此,在您的情况下,您需要定义一个类型为 Batchconfigurer
的 bean 并覆盖 getJobRepository
和 getTransactionManager
,例如:
@Bean
public BatchConfigurer batchConfigurer(EntityManagerFactory entityManagerFactory, DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new JpaTransactionManager(entityManagerFactory);
}
@Override
public JobRepository getJobRepository() {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
// set other properties
return jobRepositoryFactoryBean.getObject();
}
};
}
在 Spring 引导上下文中,如果需要,您还可以覆盖 org.springframework.boot.autoconfigure.batch.JpaBatchConfigurer
的 createTransactionManager
和 createJobRepository
方法。