我应该如何使用 .tasklet() / .chunk() 来成功完成工作?
How should I use .tasklet() / .chunk() to finish job succesfully?
我使用 Spring 批处理从源数据库克隆 table 到目标数据库。该作业是使用 jobLauncher
和传递参数从服务层手动启动的。
一切都很好,但是在步骤描述中使用带有 .chunk(10)
的当前配置(如下)我只克隆了 10 行和 Caused by: java.sql.SQLException: Result set already closed
异常。
如何正确描述步骤以完成读取-> 将整个 table 从源数据库写入目标数据库?
@Configuration
@EnableBatchProcessing
public class DatasetProcessingContext {
private static final String OVERRIDEN_BY_JOB_PARAMETER = null;
private static final String DATASET_PROCESSING_STEP = "datasetProcessingStep";
private static final String DATASET_PROCESSING_JOB = "datasetProcessingJob";
public static final String SUBSYSTEM = "subsystem";
public static final String SQL = "sql";
public static final String SOURCE_DATASOURCE = "sourceDatasource";
public static final String INSERT_QUERY = "insertQuery";
public static final String TARGET_DATASOURCE = "targetDatasource";
@Autowired
@Qualifier(DEV_DATA_SOURCE)
private DataSource devDataSource;
//set of datasources
@Autowired
private PlatformTransactionManager transactionManager;
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@Autowired
private Map<String, TableMessageDataRowMapper> tableMessageDataRowMappers;
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@Autowired
private Map<String, TableMessageDataPreparedStatementSetter> messageDataPreparedStatementSetters;
@Autowired
private JobBuilderFactory jobsFactory;
@Autowired
private StepBuilderFactory stepsFactory;
@Bean
public JobRepository jobRepository() throws Exception {
return new MapJobRepositoryFactoryBean(transactionManager).getObject();
}
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry());
return postProcessor;
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
@Bean
public static StepScope stepScope() {
return new StepScope();
}
@Bean
@SuppressWarnings("unchecked")
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public ItemStreamReader jdbcReader(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
@Value("#{jobParameters['" + SQL + "']}") String sql,
@Value("#{jobParameters['" + SOURCE_DATASOURCE + "']}") String sourceDatasource) {
JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader();
jdbcCursorItemReader.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(sourceDatasource)));
jdbcCursorItemReader.setSql(sql);
jdbcCursorItemReader.setRowMapper((RowMapper) tableMessageDataRowMappers
.get(subsystem + TABLE_MESSAGE_DATA_ROW_MAPPER));
return jdbcCursorItemReader;
}
@Bean
@SuppressWarnings("unchecked")
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public ItemWriter jdbcWriter(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
@Value("#{jobParameters['" + INSERT_QUERY + "']}") String insertQuery,
@Value("#{jobParameters['" + TARGET_DATASOURCE + "']}") String targetDatasource) {
JdbcBatchItemWriter jdbcWriter = new JdbcBatchItemWriter();
jdbcWriter.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(targetDatasource)));
jdbcWriter.setSql(insertQuery);
jdbcWriter.setItemPreparedStatementSetter(messageDataPreparedStatementSetters
.get(subsystem + TABLE_MESSAGE_DATA_PREPARED_STATEMENT_SETTER));
return jdbcWriter;
}
@Bean
@SuppressWarnings("unchecked")
public Step datasetProcessingStep() {
return stepsFactory.get(DATASET_PROCESSING_STEP)
// should I create Tasklet or chunk with some CompletionPolicy?
.chunk(10)
.reader(jdbcReader(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
.writer(jdbcWriter(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
.allowStartIfComplete(true)
.build();
}
@Bean
public Job datasetProcessingJob() {
return jobsFactory.get(DATASET_PROCESSING_JOB).start(datasetProcessingStep()).build();
}
在步骤描述中使用.chunk(new DefaultResultCompletionPolicy())
适合我的情况。此策略 returns true
来自 isComplete(RepeatContext context, RepeatStatus result)
以防出现空结果 - 而不是 ResultSet 结束。
我使用 Spring 批处理从源数据库克隆 table 到目标数据库。该作业是使用 jobLauncher
和传递参数从服务层手动启动的。
一切都很好,但是在步骤描述中使用带有 .chunk(10)
的当前配置(如下)我只克隆了 10 行和 Caused by: java.sql.SQLException: Result set already closed
异常。
如何正确描述步骤以完成读取-> 将整个 table 从源数据库写入目标数据库?
@Configuration
@EnableBatchProcessing
public class DatasetProcessingContext {
private static final String OVERRIDEN_BY_JOB_PARAMETER = null;
private static final String DATASET_PROCESSING_STEP = "datasetProcessingStep";
private static final String DATASET_PROCESSING_JOB = "datasetProcessingJob";
public static final String SUBSYSTEM = "subsystem";
public static final String SQL = "sql";
public static final String SOURCE_DATASOURCE = "sourceDatasource";
public static final String INSERT_QUERY = "insertQuery";
public static final String TARGET_DATASOURCE = "targetDatasource";
@Autowired
@Qualifier(DEV_DATA_SOURCE)
private DataSource devDataSource;
//set of datasources
@Autowired
private PlatformTransactionManager transactionManager;
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@Autowired
private Map<String, TableMessageDataRowMapper> tableMessageDataRowMappers;
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@Autowired
private Map<String, TableMessageDataPreparedStatementSetter> messageDataPreparedStatementSetters;
@Autowired
private JobBuilderFactory jobsFactory;
@Autowired
private StepBuilderFactory stepsFactory;
@Bean
public JobRepository jobRepository() throws Exception {
return new MapJobRepositoryFactoryBean(transactionManager).getObject();
}
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry());
return postProcessor;
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
@Bean
public static StepScope stepScope() {
return new StepScope();
}
@Bean
@SuppressWarnings("unchecked")
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public ItemStreamReader jdbcReader(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
@Value("#{jobParameters['" + SQL + "']}") String sql,
@Value("#{jobParameters['" + SOURCE_DATASOURCE + "']}") String sourceDatasource) {
JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader();
jdbcCursorItemReader.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(sourceDatasource)));
jdbcCursorItemReader.setSql(sql);
jdbcCursorItemReader.setRowMapper((RowMapper) tableMessageDataRowMappers
.get(subsystem + TABLE_MESSAGE_DATA_ROW_MAPPER));
return jdbcCursorItemReader;
}
@Bean
@SuppressWarnings("unchecked")
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public ItemWriter jdbcWriter(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
@Value("#{jobParameters['" + INSERT_QUERY + "']}") String insertQuery,
@Value("#{jobParameters['" + TARGET_DATASOURCE + "']}") String targetDatasource) {
JdbcBatchItemWriter jdbcWriter = new JdbcBatchItemWriter();
jdbcWriter.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(targetDatasource)));
jdbcWriter.setSql(insertQuery);
jdbcWriter.setItemPreparedStatementSetter(messageDataPreparedStatementSetters
.get(subsystem + TABLE_MESSAGE_DATA_PREPARED_STATEMENT_SETTER));
return jdbcWriter;
}
@Bean
@SuppressWarnings("unchecked")
public Step datasetProcessingStep() {
return stepsFactory.get(DATASET_PROCESSING_STEP)
// should I create Tasklet or chunk with some CompletionPolicy?
.chunk(10)
.reader(jdbcReader(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
.writer(jdbcWriter(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
.allowStartIfComplete(true)
.build();
}
@Bean
public Job datasetProcessingJob() {
return jobsFactory.get(DATASET_PROCESSING_JOB).start(datasetProcessingStep()).build();
}
在步骤描述中使用.chunk(new DefaultResultCompletionPolicy())
适合我的情况。此策略 returns true
来自 isComplete(RepeatContext context, RepeatStatus result)
以防出现空结果 - 而不是 ResultSet 结束。