spring 批处理作业的测试导致意外结果
Testing of spring batch job causes unexpected results
我正在尝试创建一个简单的备份作业处理,以验证我的目标是否正确。
我正在尝试使用 spring 批量测试来验证结果。
PS我的批处理使用框架提供的非默认配置,因为我们的作业存储库应该使用非默认模式名称。
我的作业的读取步骤配置为带有 @StepScope
注释的延迟初始化,这是必需的,因为我的作业应该有一些参数来在读取步骤
查询数据库
这是我们正在使用的示例配置
它在根包中,其余批处理配置位于子包中
@Configuration
@Import({ApplicationHibernateConfiguration.class})
@ComponentScan
public class ApplicationBatchConfiguration extends DefaultBatchConfigurer {
private static final String BATCH_PROCESSING_PREFIX = "BATCH_PROCESSING.BATCH_";
private final DataSource dataSource;
private final PlatformTransactionManager transactionManager;
private final JobLauncher jobLauncher;
private final JobRepository jobRepository;
private final JobExplorer jobExplorer;
@Autowired
public GlobalLogisticsPortalBatchConfiguration(
DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
this.dataSource = dataSource;
this.transactionManager = transactionManager;
this.jobRepository = createJobRepository();
this.jobLauncher = createJobLauncher();
this.jobExplorer = createJobExplorer();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType("DB2");
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
factoryBean.setDataSource(this.dataSource);
factoryBean.setTransactionManager(this.transactionManager);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobExplorer createJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
@Bean
public JobRepository getJobRepository() {
return jobRepository;
}
@Override
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}
@Override
@Bean
public JobLauncher getJobLauncher() {
return jobLauncher;
}
@Override
@Bean
public JobExplorer getJobExplorer() {
return jobExplorer;
}
@Bean
public JobBuilderFactory jobBuilderFactory(JobRepository jobRepository) {
return new JobBuilderFactory(jobRepository);
}
@Bean
public StepBuilderFactory stepBuilderFactory(
JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilderFactory(jobRepository, transactionManager);
}
}
我尝试使用的步骤如下所示:
@Bean
@StepScope
public JdbcPagingItemReader<DomainObject> itemReader(
@Value("#{jobParameters['id']}") String id) {
JdbcPagingItemReader<DomainObject> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(10);
Db2PagingQueryProvider nativeQueryProvider = new Db2PagingQueryProvider();
nativeQueryProvider.setSelectClause("*");
nativeQueryProvider.setFromClause("from SCHEMA.DOMAIN");
nativeQueryProvider.setWhereClause("id = :id");
Map<String, Object> params = new HashMap<>(1);
params.put("id", id);
reader.setRowMapper((rs, rowNum) -> {
DomainObject element = new DomainObject();
element.setId(rs.getString("ID"));
return element;
});
reader.setParameterValues(params);
reader.setQueryProvider(nativeQueryProvider);
return reader;
}
@Bean
public Step fetchDomain() throws Exception {
return stepBuilderFactory.get("fetchDomain")
.<HierarchyElement, HierarchyElement>chunk(10)
.faultTolerant()
.reader(itemReader(null))
.writer(items -> items.forEach(System.out::println))
.build();
}
实际作业 bean 当前配置为仅启动单步
@Bean
public Job backupJob() throws Exception {
return jobBuilderFactory.get("backupJob")
.start(fetchHeid())
.build();
}
我的测试代码如下所示
@RunWith(SpringRunner.class)
@SpringBatchTest
@ContextConfiguration(classes = {ApplicationBatchConfiguration.class})
public class BackupJobConfigurationTest {
@Autowired
@Qualifier(value = "backupJob")
public Job job;
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void flowTest() throws Exception {
JobParameters parameters = new JobParametersBuilder()
.addString("id", "124")
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(parameters);
assertEquals(BatchStatuses.COMPLETED, execution.getExitStatus().getExitCode()); //failed
}
}
我希望退出代码为 "COMPLETED" 并得到 "UNKNOWN"。
另外我不确定代码是否真的被调用了,因为我没有看到 writer lambda 的任何输出。
我在测试中看到的唯一输出是
Aug 30, 2019 2:52:17 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=backupJob]] launched with the following parameters: [{id=124}]
org.junit.ComparisonFailure:
Expected :COMPLETED
Actual :UNKNOWN
我想通了,首先,我需要从我的配置中删除 SimpleAsyncTaskExecutor 以实际测试同一线程中的代码,然后通过更仔细地阅读参考资料,我重新配置了我的批处理配置,而不是直接扩展 BatchConfigurer 我已经在我的配置中将它配置为一个 bean 并添加 @EnableSpringBatch 注释
@Bean
public BatchConfigurer batchConfigurer() {
return new DefaultBatchConfigurer() {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType("db2");
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
factoryBean.setDataSource(dataSource);
factoryBean.setTransactionManager(transactionManager);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobExplorer createJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
//jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
};
}
我正在尝试创建一个简单的备份作业处理,以验证我的目标是否正确。
我正在尝试使用 spring 批量测试来验证结果。
PS我的批处理使用框架提供的非默认配置,因为我们的作业存储库应该使用非默认模式名称。
我的作业的读取步骤配置为带有 @StepScope
注释的延迟初始化,这是必需的,因为我的作业应该有一些参数来在读取步骤
这是我们正在使用的示例配置 它在根包中,其余批处理配置位于子包中
@Configuration
@Import({ApplicationHibernateConfiguration.class})
@ComponentScan
public class ApplicationBatchConfiguration extends DefaultBatchConfigurer {
private static final String BATCH_PROCESSING_PREFIX = "BATCH_PROCESSING.BATCH_";
private final DataSource dataSource;
private final PlatformTransactionManager transactionManager;
private final JobLauncher jobLauncher;
private final JobRepository jobRepository;
private final JobExplorer jobExplorer;
@Autowired
public GlobalLogisticsPortalBatchConfiguration(
DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
this.dataSource = dataSource;
this.transactionManager = transactionManager;
this.jobRepository = createJobRepository();
this.jobLauncher = createJobLauncher();
this.jobExplorer = createJobExplorer();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType("DB2");
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
factoryBean.setDataSource(this.dataSource);
factoryBean.setTransactionManager(this.transactionManager);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobExplorer createJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
@Bean
public JobRepository getJobRepository() {
return jobRepository;
}
@Override
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}
@Override
@Bean
public JobLauncher getJobLauncher() {
return jobLauncher;
}
@Override
@Bean
public JobExplorer getJobExplorer() {
return jobExplorer;
}
@Bean
public JobBuilderFactory jobBuilderFactory(JobRepository jobRepository) {
return new JobBuilderFactory(jobRepository);
}
@Bean
public StepBuilderFactory stepBuilderFactory(
JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilderFactory(jobRepository, transactionManager);
}
}
我尝试使用的步骤如下所示:
@Bean
@StepScope
public JdbcPagingItemReader<DomainObject> itemReader(
@Value("#{jobParameters['id']}") String id) {
JdbcPagingItemReader<DomainObject> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(10);
Db2PagingQueryProvider nativeQueryProvider = new Db2PagingQueryProvider();
nativeQueryProvider.setSelectClause("*");
nativeQueryProvider.setFromClause("from SCHEMA.DOMAIN");
nativeQueryProvider.setWhereClause("id = :id");
Map<String, Object> params = new HashMap<>(1);
params.put("id", id);
reader.setRowMapper((rs, rowNum) -> {
DomainObject element = new DomainObject();
element.setId(rs.getString("ID"));
return element;
});
reader.setParameterValues(params);
reader.setQueryProvider(nativeQueryProvider);
return reader;
}
@Bean
public Step fetchDomain() throws Exception {
return stepBuilderFactory.get("fetchDomain")
.<HierarchyElement, HierarchyElement>chunk(10)
.faultTolerant()
.reader(itemReader(null))
.writer(items -> items.forEach(System.out::println))
.build();
}
实际作业 bean 当前配置为仅启动单步
@Bean
public Job backupJob() throws Exception {
return jobBuilderFactory.get("backupJob")
.start(fetchHeid())
.build();
}
我的测试代码如下所示
@RunWith(SpringRunner.class)
@SpringBatchTest
@ContextConfiguration(classes = {ApplicationBatchConfiguration.class})
public class BackupJobConfigurationTest {
@Autowired
@Qualifier(value = "backupJob")
public Job job;
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void flowTest() throws Exception {
JobParameters parameters = new JobParametersBuilder()
.addString("id", "124")
.toJobParameters();
JobExecution execution = jobLauncherTestUtils.launchJob(parameters);
assertEquals(BatchStatuses.COMPLETED, execution.getExitStatus().getExitCode()); //failed
}
}
我希望退出代码为 "COMPLETED" 并得到 "UNKNOWN"。 另外我不确定代码是否真的被调用了,因为我没有看到 writer lambda 的任何输出。
我在测试中看到的唯一输出是
Aug 30, 2019 2:52:17 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=backupJob]] launched with the following parameters: [{id=124}]
org.junit.ComparisonFailure:
Expected :COMPLETED
Actual :UNKNOWN
我想通了,首先,我需要从我的配置中删除 SimpleAsyncTaskExecutor 以实际测试同一线程中的代码,然后通过更仔细地阅读参考资料,我重新配置了我的批处理配置,而不是直接扩展 BatchConfigurer 我已经在我的配置中将它配置为一个 bean 并添加 @EnableSpringBatch 注释
@Bean
public BatchConfigurer batchConfigurer() {
return new DefaultBatchConfigurer() {
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
factoryBean.setDatabaseType("db2");
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
factoryBean.setDataSource(dataSource);
factoryBean.setTransactionManager(transactionManager);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobExplorer createJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
//jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
};
}