spring 批次的 BATCH_JOB_INSTANCE TABLE 中的主键冲突,而 运行 作业并行

Primary Key Violation in spring batch's BATCH_JOB_INSTANCE TABLE while running job parallel

我正在使用 spring batch admin 3.0.3 with spring 3.2.0.Release with SQL Server 2008。当我尝试 运行 多个作业同时进行。 堆栈跟踪如下。

[ERROR] 2015-04-07 18:50:40.991 [org.springframework.scheduling.quartz.SchedulerFactoryBean#3_Worker-1] BatchJobScheduler executeInternal -
                    Violation of PRIMARY KEY constraint 'PK__BATCH_JO__4848154A7F60ED59'. Cannot insert duplicate key in object 'dbo.BATCH_JOB_INSTANCE'. The duplicate key value is (0).
org.hibernate.exception.ConstraintViolationException: Violation of PRIMARY KEY constraint 'PK__BATCH_JO__4848154A7F60ED59'. Cannot insert duplicate key in object 'dbo.BATCH_JOB_INSTANCE'. The duplicate key value is (0).
    at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:128) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:49) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:125) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:110) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    at org.hibernate.engine.jdbc.internal.proxy.AbstractStatementProxyHandler.continueInvocation(AbstractStatementProxyHandler.java:129) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    at org.hibernate.engine.jdbc.internal.proxy.AbstractProxyHandler.invoke(AbstractProxyHandler.java:81) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    at com.sun.proxy.$Proxy70.executeUpdate(Unknown Source) ~[?:?]
    at org.springframework.jdbc.core.JdbcTemplate.doInPreparedStatement(JdbcTemplate.java:824) ~[spring-jdbc-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.doInPreparedStatement(JdbcTemplate.java:818) ~[spring-jdbc-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:589) ~[spring-jdbc-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:818) ~[spring-jdbc-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:874) ~[spring-jdbc-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:878) ~[spring-jdbc-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.batch.core.repository.dao.JdbcJobInstanceDao.createJobInstance(JdbcJobInstanceDao.java:115) ~[spring-batch-core-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:135) ~[spring-batch-core-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_71]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_71]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) ~[spring-aop-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) ~[spring-aop-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) ~[spring-aop-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.proceedWithInvocation(TransactionInterceptor.java:96) ~[spring-tx-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:260) ~[spring-tx-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94) ~[spring-tx-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) ~[spring-aop-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean.invoke(AbstractJobRepositoryFactoryBean.java:172) ~[spring-batch-core-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) ~[spring-aop-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) ~[spring-aop-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at com.sun.proxy.$Proxy95.createJobExecution(Unknown Source) ~[?:?]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) ~[spring-batch-core-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at com.abc.testme.batchjobs.util.BatchJobUtils.runJob(BatchJobUtils.java:154) ~[BatchJobUtils.class:?]
    at com.abc.testme.batchjobs.util.BatchJobUtils.runJobWithCheckForRunningExecutions(BatchJobUtils.java:136) ~[BatchJobUtils.class:?]
    at com.abc.testme.batchjobs.util.BatchJobUtils.runJob(BatchJobUtils.java:80) ~[BatchJobUtils.class:?]
    at com.abc.testme.batchjobs.scheduler.BatchJobScheduler.executeInternal(BatchJobScheduler.java:52) [BatchJobScheduler.class:?]
    at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113) [spring-context-support-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at org.quartz.core.JobRunShell.run(JobRunShell.java:216) [quartz-1.8.5.jar:?]
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549) [quartz-1.8.5.jar:?]
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Violation of PRIMARY KEY constraint 'PK__BATCH_JO__4848154A7F60ED59'. Cannot insert duplicate key in object 'dbo.BATCH_JOB_INSTANCE'. The duplicate key value is (0).
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:216) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1515) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:404) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:350) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:5696) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:1715) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:180) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:155) ~[sqljdbc4-4.0.jar:?]
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:314) ~[sqljdbc4-4.0.jar:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[?:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.7.0_71]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[?:1.7.0_71]
    at org.hibernate.engine.jdbc.internal.proxy.AbstractStatementProxyHandler.continueInvocation(AbstractStatementProxyHandler.java:122) ~[hibernate-core-4.1.9.Final.jar:4.1.9.Final]
    ... 33 more

我的理解是多个作业试图从同一个 table 执行 update/fetch 值,因此它的值是 duplicated.Please 请提供您的输入。但是,当我以 1 分钟的间隔 运行 这些作业时,我没有得到任何异常。感谢大家的阅读。

更新:这里是 transactionManager 和数据源的配置

会话工厂:

<bean id="sessionFactory"
        class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="packagesToScan">
            <list>
                <value>com.abc.testme.model</value>
            </list>
        </property>

        <property name="hibernateProperties">
            <value>
                hibernate.dialect=${hibernate.dialect}
                hibernate.hbm2ddl.auto=${hibernate.hbm2ddl.auto}
                hibernate.show_sql=${hibernate.show_sql}
                hibernate.format.sql=${hibernate.format.sql}
                hibernate.query.substitutions=${hibernate.query.substitutions}
            </value>
            <!-- Turn batching off for better error messages under PostgreSQL -->
            <!-- hibernate.jdbc.batch_size=0 -->
        </property>

    </bean>

交易经理:

<bean id="transactionManager"
        class="org.springframework.orm.hibernate4.HibernateTransactionManager"
        p:sessionFactory-ref="sessionFactory">
</bean>

数据源:

<beans:bean id="dataSource"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <beans:property name="driverClassName" value="${jdbc.driverClassName}" />
        <beans:property name="url" value="${jdbc.url}" />
        <beans:property name="username" value="${jdbc.username}" />
        <beans:property name="password" value="${jdbc.password}" />
</beans:bean>

我只是将我的配置更改为以下 属性 在 jobRepository 中添加一个 "isolationLevelForCreate" 解决了我的问题。发布它,以便如果有人遇到类似问题,这可能会有所帮助。

<bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="databaseType" value="sqlserver" />
        <property name="isolationLevelForCreate" value="ISOLATION_REPEATABLE_READ" />
    </bean>

好吧,我们遇到了同样的问题,似乎已经能够通过配置解决它。

我仍然需要调查设置 ISOLATION_REPEATABLE_READ 是否足够,但在我这样做的同时,我还为 SQL-Server 启用了 DataFieldMaxValueIncrementer-Cache。

现在会一次抓取20个id保存在缓存中,大大降低了死锁的概率。 如果服务器停止,最多 20 个 ID 可能 "lost",并且在集群服务之间 ID 不会严格增加 - 但对我们来说没问题。

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

  private static final String ISOLATION_REPEATABLE_READ = "ISOLATION_REPEATABLE_READ";

  @Autowired
  private DataSource dataSource;
  @Autowired
  private PlatformTransactionManager platformTransactionManager;

  @Bean
  public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(platformTransactionManager);
    factory.setValidateTransactionState(true);
    factory.setIsolationLevelForCreate(ISOLATION_REPEATABLE_READ);
    factory.setIncrementerFactory(customIncrementerFactory());
    factory.afterPropertiesSet();
    return factory.getObject();
  }

  @Bean
  public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    return simpleJobLauncher;
  }

  private DataFieldMaxValueIncrementerFactory customIncrementerFactory() {
    return new CustomDataFieldMaxValueIncrementerFactory(dataSource);
  }

  private class CustomDataFieldMaxValueIncrementerFactory extends DefaultDataFieldMaxValueIncrementerFactory {

    CustomDataFieldMaxValueIncrementerFactory(DataSource dataSource) {
      super(dataSource);
    }

    @Override
    public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, String incrementerName) {
      DataFieldMaxValueIncrementer incrementer = super.getIncrementer(incrementerType, incrementerName);
      if (incrementer instanceof SqlServerMaxValueIncrementer) {
        ((SqlServerMaxValueIncrementer) incrementer).setCacheSize(20);
      }
      return incrementer;
    }
  }
}

如果有更简单的方法来实现这一点,请随时为我指明方向:)

更新: 虽然上面解决了 "could not increment identity" 的错误,但我们现在面临其他问题:

Encountered fatal error executing job org.springframework.dao.DeadlockLoserDataAccessException: PreparedStatementCallback; SQL [SELECT STEP_EXECUTION_ID, STEP_NAME, START_TIME, END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED, VERSION from BATCH_STEP_EXECUTION where JOB_EXECUTION_ID = ? order by STEP_EXECUTION_ID]; Die Transaktion (Prozess-ID 477) befand sich auf Sperre | Kommunikationspuffer Ressourcen aufgrund eines anderen Prozesses in einer Deadlocksituation und wurde als Deadlockopfer ausgew?hlt. F?hren Sie die Transaktion erneut aus.; nested exception is com.microsoft.sqlserver.jdbc.SQLServerException: Die Transaktion (Prozess-ID 477) befand sich auf Sperre | Kommunikationspuffer Ressourcen aufgrund eines anderen Prozesses in einer Deadlocksituation und wurde als Deadlockopfer ausgew?hlt. F?hren Sie die Transaktion erneut aus. at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:263) at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:73) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:649) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:781) at org.springframework.batch.core.repository.dao.JdbcStepExecutionDao.addStepExecutions(JdbcStepExecutionDao.java:299) at org.springframework.batch.core.repository.support.SimpleJobRepository.getLastStepExecution(SimpleJobRepository.java:219) at sun.reflect.GeneratedMethodAccessor164.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ... {noformat}

不同的作业现在试图同时插入数据库,导致死锁。我们现在将 IsolationLevel 设置为 "ISOLATION_READ_COMMITTED"。为了防止在集群上并行执行相同的作业,我们一直在使用 Hazelcast 锁。

https://jira.spring.io/browse/BATCH-2147

到目前为止,我无法使用 sql 服务器 IDENTITY 列而不是伪序列来重现任何问题(死锁或重复键)。无需使用重试,因为不再发生死锁。在我的场景中没有其他解决方案有效,我什至尝试使用 SQL Server 2012 Sequences。针对单个数据库使用多个批处理服务器实例 运行 进行了测试。用于测试的作业使用分区(使用 ThreadPoolTask​​Executor)。

创建以下 IDENTITY 类型的字段:

BATCH_JOB_INSTANCE.JOB_INSTANCE_ID
BATCH_JOB_EXECUTION.JOB_EXECUTION_ID
BATCH_STEP_EXECUTION.STEP_EXECUTION_ID

更改或复制 JdbcJobInstanceDao (SqlServerJdbcJobInstanceDao) 并将 CREATE_JOB_INSTANCE sql 常量和 createJobInstance 方法更改为:

private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_NAME, JOB_KEY, VERSION)"
        + " values (?, ?, ?)";

@Override
public JobInstance createJobInstance(String jobName,
                                     JobParameters jobParameters) {

    Assert.notNull(jobName, "Job name must not be null.");
    Assert.notNull(jobParameters, "JobParameters must not be null.");

    Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist");

    JobInstance jobInstance = new JobInstance(null, jobName);
    jobInstance.incrementVersion();

    KeyHolder generatedKeyHolder = new GeneratedKeyHolder();

    getJdbcTemplate().update(connection -> {
        final PreparedStatement ps = connection.prepareStatement(getQuery(CREATE_JOB_INSTANCE), Statement.RETURN_GENERATED_KEYS);
        ps.setString(1, jobName);
        ps.setString(2, jobKeyGenerator.generateKey(jobParameters));
        ps.setString(3, String.valueOf(jobInstance.getVersion()));
        return ps;
    }, generatedKeyHolder);

    jobInstance.setId(generatedKeyHolder.getKey().longValue());

    return jobInstance;
}

更改或复制 JdbcJobExecutionDao (SqlServerJdbcJobExecutionDao) 并更改 SAVE_JOB_EXECUTION sql 常量和 saveJobExecution 方法:

private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_INSTANCE_ID, START_TIME, "
        + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED, JOB_CONFIGURATION_LOCATION) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

@Override
public void saveJobExecution(JobExecution jobExecution) {

    validateJobExecution(jobExecution);

    jobExecution.incrementVersion();

    KeyHolder generatedKeyHolder = new GeneratedKeyHolder();

    getJdbcTemplate().update(connection -> {
        PreparedStatement ps = connection.prepareStatement(getQuery(SAVE_JOB_EXECUTION), Statement.RETURN_GENERATED_KEYS);
        ps.setLong  ( 1, jobExecution.getJobId());
        ps.setDate  ( 2, jobExecution.getStartTime() != null ? new java.sql.Date(jobExecution.getStartTime().getTime()) : null);
        ps.setDate  ( 3, jobExecution.getEndTime() != null ? new java.sql.Date(jobExecution.getEndTime().getTime()) : null);
        ps.setString( 4, jobExecution.getStatus().toString());
        ps.setString( 5, jobExecution.getExitStatus().getExitCode());
        ps.setString( 6, jobExecution.getExitStatus().getExitDescription());
        ps.setInt   ( 7, jobExecution.getVersion());
        ps.setDate  ( 8, jobExecution.getCreateTime() != null ? new java.sql.Date(jobExecution.getCreateTime().getTime()) : null);
        ps.setDate  ( 9, jobExecution.getLastUpdated() != null ? new java.sql.Date(jobExecution.getLastUpdated().getTime()) : null);
        ps.setString(10, jobExecution.getJobConfigurationName());
        return ps;
    }, generatedKeyHolder);

    jobExecution.setId(generatedKeyHolder.getKey().longValue());

    insertJobParameters(jobExecution.getId(), jobExecution.getJobParameters());
}

更改或复制 JdbcStepExecutionDao (SqlServerJdbcStepExecutionDao) 并更改 SAVE_STEP_EXECUTION sql 常量和 saveStepExecution/saveStepExecutions 方法:

private static final String SAVE_STEP_EXECUTION = "INSERT into %PREFIX%STEP_EXECUTION(VERSION, STEP_NAME, JOB_EXECUTION_ID, START_TIME, "
        + "END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED) "
        + "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

@Override
public void saveStepExecution(StepExecution stepExecution) {

    stepExecution.incrementVersion();

    final KeyHolder generatedKeyHolder = new GeneratedKeyHolder();

    getJdbcTemplate().update(connection -> {
        PreparedStatement ps = connection.prepareStatement(getQuery(SAVE_STEP_EXECUTION), Statement.RETURN_GENERATED_KEYS);
        ps.setInt   ( 1, stepExecution.getVersion());
        ps.setString( 2, stepExecution.getStepName());
        ps.setLong  ( 3, stepExecution.getJobExecutionId());
        ps.setDate  ( 4, stepExecution.getStartTime() != null ? new Date(stepExecution.getStartTime().getTime()) : null);
        ps.setDate  ( 5, stepExecution.getEndTime() != null ? new Date(stepExecution.getEndTime().getTime()) : null);
        ps.setString( 6, stepExecution.getStatus().toString());
        ps.setInt   ( 7, stepExecution.getCommitCount());
        ps.setInt   ( 8, stepExecution.getReadCount());
        ps.setInt   ( 9, stepExecution.getFilterCount());
        ps.setInt   (10, stepExecution.getWriteCount());
        ps.setString(11, stepExecution.getExitStatus().getExitCode());
        ps.setString(12, truncateExitDescription(stepExecution.getExitStatus().getExitDescription()));
        ps.setInt   (13, stepExecution.getReadSkipCount());
        ps.setInt   (14, stepExecution.getWriteSkipCount());
        ps.setInt   (15, stepExecution.getProcessSkipCount());
        ps.setInt   (16, stepExecution.getRollbackCount());
        ps.setDate  (17, stepExecution.getLastUpdated() != null ? new Date(stepExecution.getLastUpdated().getTime()) : null);
        return ps;
    }, generatedKeyHolder);

    stepExecution.setId(generatedKeyHolder.getKey().longValue());
}

@Override
public void saveStepExecutions(final Collection<StepExecution> stepExecutions) {
    Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
    for (StepExecution stepExecution : stepExecutions) {
        saveStepExecution(stepExecution);
    }
}

通过以下更改更改或创建 JobRepositoryFactoryBean (SqlServerJobRepositoryFactoryBean) 的副本:

@Override
protected JobInstanceDao createJobInstanceDao() throws Exception {
    SqlServerJdbcJobInstanceDao dao = new SqlServerJdbcJobInstanceDao();
    dao.setJdbcTemplate(jdbcOperations);
    dao.setJobIncrementer(incrementerFactory.getIncrementer(databaseType, tablePrefix + "JOB_SEQ"));
    dao.setTablePrefix(tablePrefix);
    dao.afterPropertiesSet();
    return dao;
}

@Override
protected JobExecutionDao createJobExecutionDao() throws Exception {
    SqlServerJdbcJobExecutionDao dao = new SqlServerJdbcJobExecutionDao();
    dao.setJdbcTemplate(jdbcOperations);
    dao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(databaseType, tablePrefix
            + "JOB_EXECUTION_SEQ"));
    dao.setTablePrefix(tablePrefix);
    dao.setClobTypeToUse(determineClobTypeToUse(this.databaseType));
    dao.setExitMessageLength(maxVarCharLength);
    dao.afterPropertiesSet();
    return dao;
}

@Override
protected StepExecutionDao createStepExecutionDao() throws Exception {
    SqlServerJdbcStepExecutionDao dao = new SqlServerJdbcStepExecutionDao();
    dao.setJdbcTemplate(jdbcOperations);
    dao.setStepExecutionIncrementer(incrementerFactory.getIncrementer(databaseType, tablePrefix
            + "STEP_EXECUTION_SEQ"));
    dao.setTablePrefix(tablePrefix);
    dao.setClobTypeToUse(determineClobTypeToUse(this.databaseType));
    dao.setExitMessageLength(maxVarCharLength);
    dao.afterPropertiesSet();
    return dao;
}

创建批处理配置以使用使用新 SqlServerJobRepositoryFactoryBean 的新 SqlServerBatchConfigurer:

@Configuration
public class BatchConfiguration {

    @Bean
    public SqlServerBatchConfigurer basicBatchConfigurer(BatchProperties properties, DataSource dataSource) {
        return new SqlServerBatchConfigurer(properties, dataSource);
    }

    class SqlServerBatchConfigurer extends BasicBatchConfigurer {

        private final DataSource dataSource;
        private final BatchProperties properties;

        SqlServerBatchConfigurer(final BatchProperties properties, final DataSource dataSource) {
            super(properties, dataSource);
            this.properties = properties;
            this.dataSource = dataSource;
        }

        @Override
        protected JobRepository createJobRepository() throws Exception {
            SqlServerJobRepositoryFactoryBean factory = new SqlServerJobRepositoryFactoryBean();

            // this is required to avoid deadlocks
            factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");

            factory.setDataSource(this.dataSource);
            String tablePrefix = this.properties.getTablePrefix();
            if (StringUtils.hasText(tablePrefix)) {
                factory.setTablePrefix(tablePrefix);
            }
            factory.setTransactionManager(getTransactionManager());
            factory.afterPropertiesSet();
            return factory.getObject();
        }
    }
}