如何从多个应用程序实例同时 运行 相同 spring 批处理作业
How to run same spring batch job at same time from multiple instances of application
我创建了一个 spring 批处理作业,它在定义的时间使用调度程序执行。我们有同一个应用程序的多个实例,所以同一个作业可以从多个实例同时执行。所有应用程序都使用相同的数据库。因此,我遇到了错误
Stack Trace: org.springframework.transaction.TransactionSystemException: Could not commit JDBC transaction; nested exception is org.postgresql.util.PSQLException: ERROR: could not serialize access due to read/write dependencies among transactions Detail: Reason code: Canceled on identification as a pivot, during commit attempt. Hint: The transaction might succeed if retried. at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:335) at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743) at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711) at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:631) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:385) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean.invoke(AbstractJobRepositoryFactoryBean.java:181) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy209.createJobExecution(Unknown Source) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
我已使用以下代码声明工作-
JobParameters params = new JobParametersBuilder()
.addString("JobID", UUID.randomUUID().toString(), true)
.toJobParameters();
jobLauncher.run(job, params);
并在 yaml 文件中做了以下配置
batch:
job:
enabled: false
initialize-schema: always
repository:
isolationlevelforcreate: ISOLATION_READ_COMMITTED
我添加了侦听器来检查任何作业 运行ning 是否处于 STARTED 状态,如果找到任何 运行ning 作业,当前作业将停止
public class SingleInstanceListener implements JobExecutionListener {
@Autowired
private JobExplorer explorer;
@Override
public void beforeJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
Set<JobExecution> executions = explorer.findRunningJobExecutions(jobName);
if(executions.size() > 1) {
jobExecution.stop();
}
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
但我仍然遇到上述错误。如果只有一个应用程序实例,那么 job 运行ning 没问题。有什么方法可以从多个应用程序实例同时 运行 相同的作业,或者只允许一个作业从任何一个应用程序 运行
没有像您预期的那样考虑隔离级别 属性。要自定义隔离级别,您需要提供 JobRepository
到 BatchConfigurer
。这是一个简单的例子:
@Configuration
@EnableBatchProcessing
public class MyJobConfig extends BasicBatchConfigurer {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean repositoryFactoryBean = new JobRepositoryFactoryBean();
repositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
// set other properties on the factory bean
repositoryFactoryBean.afterPropertiesSet();
return repositoryFactoryBean.getObject();
}
}
此处的文档对此进行了解释:Transaction Configuration for the JobRepository。
注:BasicBatchConfigurer
来自SpringBoot。如果你使用 Spring Batch without Spring Boot,你可以改为扩展 DefaultBatchConfigurer
。
我创建了一个 spring 批处理作业,它在定义的时间使用调度程序执行。我们有同一个应用程序的多个实例,所以同一个作业可以从多个实例同时执行。所有应用程序都使用相同的数据库。因此,我遇到了错误
Stack Trace: org.springframework.transaction.TransactionSystemException: Could not commit JDBC transaction; nested exception is org.postgresql.util.PSQLException: ERROR: could not serialize access due to read/write dependencies among transactions Detail: Reason code: Canceled on identification as a pivot, during commit attempt. Hint: The transaction might succeed if retried. at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:335) at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743) at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711) at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:631) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:385) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean.invoke(AbstractJobRepositoryFactoryBean.java:181) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy209.createJobExecution(Unknown Source) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
我已使用以下代码声明工作-
JobParameters params = new JobParametersBuilder()
.addString("JobID", UUID.randomUUID().toString(), true)
.toJobParameters();
jobLauncher.run(job, params);
并在 yaml 文件中做了以下配置
batch:
job:
enabled: false
initialize-schema: always
repository:
isolationlevelforcreate: ISOLATION_READ_COMMITTED
我添加了侦听器来检查任何作业 运行ning 是否处于 STARTED 状态,如果找到任何 运行ning 作业,当前作业将停止
public class SingleInstanceListener implements JobExecutionListener {
@Autowired
private JobExplorer explorer;
@Override
public void beforeJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
Set<JobExecution> executions = explorer.findRunningJobExecutions(jobName);
if(executions.size() > 1) {
jobExecution.stop();
}
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
但我仍然遇到上述错误。如果只有一个应用程序实例,那么 job 运行ning 没问题。有什么方法可以从多个应用程序实例同时 运行 相同的作业,或者只允许一个作业从任何一个应用程序 运行
没有像您预期的那样考虑隔离级别 属性。要自定义隔离级别,您需要提供 JobRepository
到 BatchConfigurer
。这是一个简单的例子:
@Configuration
@EnableBatchProcessing
public class MyJobConfig extends BasicBatchConfigurer {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean repositoryFactoryBean = new JobRepositoryFactoryBean();
repositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
// set other properties on the factory bean
repositoryFactoryBean.afterPropertiesSet();
return repositoryFactoryBean.getObject();
}
}
此处的文档对此进行了解释:Transaction Configuration for the JobRepository。
注:BasicBatchConfigurer
来自SpringBoot。如果你使用 Spring Batch without Spring Boot,你可以改为扩展 DefaultBatchConfigurer
。