Spring 批量执行作业两次
Spring Batch executing job twice
我使用 Spring Boot 创建了一个简单的批处理应用程序,它从数据库中读取数据并将获取的项目作为列表发送到 REST API。一切正常,但它两次将数据发送到 REST API。我检查了其他帖子的答案。试过 spring.batch.job.enabled=false.
批量配置class如下
@Configuration
@EnableBatchProcessing
public class JobBatchConfig extends DefaultBatchConfigurer {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Override
public void setDataSource(DataSource dataSource) {
// override to do not set datasource even if a datasource exist.
// initialize will use a Map based JobRepository (instead of database)
}
@Bean
//@StepScope
public ItemReader<ModelClass> modelItemReader() {
JdbcPagingItemReader<ModelClass> reader = new JdbcPagingItemReader<ModelClass>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean =
new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
sqlPagingQueryProviderFactoryBean.setSelectClause(SQLQueries.SELECT_QUERY);
sqlPagingQueryProviderFactoryBean.setFromClause(SQLQueries.FROM_CLAUSE);
sqlPagingQueryProviderFactoryBean.setSortKey(SQLQueries.SORT_KEY);
sqlPagingQueryProviderFactoryBean.setWhereClause(SQLQueries.WHERE_CONDITION);
try {
reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
} catch (Exception e) {
e.printStackTrace();
}
reader.setDataSource(dataSource);
reader.setPageSize(100);
reader.setRowMapper(new ModelClassRowMapper());
return reader;
}
@Bean
public ModelClassItemWriter modelClassItemWriter(){
return new ModelClassItemWriter();
}
@Bean
public Step fetchDataStep() {
return stepBuilderFactory
.get("fetchDataStep")
.<ModelClass, ModelClass> chunk(50)
.reader(modelItemReader())
.writer(modelClassItemWriter())
.build();
}
@Bean
public Job fetchDataJob() {
return jobBuilderFactory.get("fetchDatatJob")
.start(fetchDataStep())
.build();
}
}
作者class如下-
public class ModelClassItemWriter implements ItemWriter<ModelClass> {
private static final Logger LOGGER = LoggerFactory.getLogger(ModelClassItemWriter .class);
@Value("${rest.uri}")
private String baseUrl;
@Override
public void write(List<? extends ModelClass> items) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI uri = new URI(baseUrl);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<List<? extends ModelClass>> request =
new HttpEntity<List<? extends ModelClass>>(items, headers);
ResponseEntity<String> result = restTemplate.postForEntity(uri, request, String.class);
LOGGER.info("**********RESPONSE************" + result.getStatusCode());
}
}
在这里,它向这个 REST API 发送了两次数据。该批处理需要每 10 分钟执行一次。我编写了一个每 10 分钟运行一次该作业的调度程序。以下是显示它正在执行两次的日志。
2021-05-14 20:50:00.070 INFO 84100 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [fetchDataStep]
2021-05-14 20:50:32.833 INFO 84100 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [fetchDataStep] executed in 32s763ms
2021-05-14 20:50:32.837 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] completed with the following parameters: [{JobID=1621005600052}] and the following status: [COMPLETED] in 32s775ms
2021-05-14 20:50:33.017 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] launched with the following parameters: [{JobID=1621005633015}]
2021-05-14 20:50:33.021 INFO 84100 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [fetchDataStep]
2021-05-14 20:51:02.712 INFO 84100 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [fetchDataStep] executed in 29s691ms
2021-05-14 20:51:02.716 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] completed with the following parameters: [{JobID=1621005633015}] and the following status: [COMPLETED] in 29s696ms
2021-05-14 21:00:00.061 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] launched with the following parameters: [{JobID=1621006200051}]
调度器代码如下-
@Component
public class ScheduleBatchJobs {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@Scheduled(cron = "* */10 * * * ?")
public void perform() throws Exception
{
JobParameters params = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, params);
}
}
谁能支持我解决这个问题。
您的 spring-批处理配置看起来不错。问题出在 cron 表达式上。
这个 cron 表达式 cron = "* */10 * * * ?"
将每 10 分钟执行一次,并且每次线程在一分钟内完成作业时都会触发该方法。例如:
@Scheduled(cron = "* */5 * * * ?")
public void print() throws InterruptedException {
Thread.sleep(10000);
System.out.println("Hello: "+ Instant.now());
}
在这种情况下,print() 将每五分钟执行一次。但是,该线程在 10 秒内完成,因此 10 秒后它将再次触发 print() 并且此 print() 将继续被调用一分钟。以下是日志:
Hello: 2021-05-15T15:25:10.013Z
Hello: 2021-05-15T15:25:21.005Z
Hello: 2021-05-15T15:25:32.001Z
Hello: 2021-05-15T15:25:43.002Z
Hello: 2021-05-15T15:25:54.005Z
Hello: 2021-05-15T15:30:10.009Z
Hello: 2021-05-15T15:30:21.001Z
Hello: 2021-05-15T15:30:32.002Z
Hello: 2021-05-15T15:30:43.001Z
Hello: 2021-05-15T15:30:54.005Z
Hello: 2021-05-15T15:31:05.002Z
在您的情况下,作业在 30 秒内完成,因此 cron 表达式会再次触发 perform(),就像在 1 分钟的时间范围内一样。
请将您的 cron 表达式更改为
@Scheduled(cron = "0 0/10 * * * ?")
即使作业在 1 分钟内完成,这也会每 10 分钟触发一次 perform()。
我使用 Spring Boot 创建了一个简单的批处理应用程序,它从数据库中读取数据并将获取的项目作为列表发送到 REST API。一切正常,但它两次将数据发送到 REST API。我检查了其他帖子的答案。试过 spring.batch.job.enabled=false.
批量配置class如下
@Configuration
@EnableBatchProcessing
public class JobBatchConfig extends DefaultBatchConfigurer {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Override
public void setDataSource(DataSource dataSource) {
// override to do not set datasource even if a datasource exist.
// initialize will use a Map based JobRepository (instead of database)
}
@Bean
//@StepScope
public ItemReader<ModelClass> modelItemReader() {
JdbcPagingItemReader<ModelClass> reader = new JdbcPagingItemReader<ModelClass>();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean =
new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
sqlPagingQueryProviderFactoryBean.setSelectClause(SQLQueries.SELECT_QUERY);
sqlPagingQueryProviderFactoryBean.setFromClause(SQLQueries.FROM_CLAUSE);
sqlPagingQueryProviderFactoryBean.setSortKey(SQLQueries.SORT_KEY);
sqlPagingQueryProviderFactoryBean.setWhereClause(SQLQueries.WHERE_CONDITION);
try {
reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
} catch (Exception e) {
e.printStackTrace();
}
reader.setDataSource(dataSource);
reader.setPageSize(100);
reader.setRowMapper(new ModelClassRowMapper());
return reader;
}
@Bean
public ModelClassItemWriter modelClassItemWriter(){
return new ModelClassItemWriter();
}
@Bean
public Step fetchDataStep() {
return stepBuilderFactory
.get("fetchDataStep")
.<ModelClass, ModelClass> chunk(50)
.reader(modelItemReader())
.writer(modelClassItemWriter())
.build();
}
@Bean
public Job fetchDataJob() {
return jobBuilderFactory.get("fetchDatatJob")
.start(fetchDataStep())
.build();
}
}
作者class如下-
public class ModelClassItemWriter implements ItemWriter<ModelClass> {
private static final Logger LOGGER = LoggerFactory.getLogger(ModelClassItemWriter .class);
@Value("${rest.uri}")
private String baseUrl;
@Override
public void write(List<? extends ModelClass> items) throws Exception {
RestTemplate restTemplate = new RestTemplate();
URI uri = new URI(baseUrl);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<List<? extends ModelClass>> request =
new HttpEntity<List<? extends ModelClass>>(items, headers);
ResponseEntity<String> result = restTemplate.postForEntity(uri, request, String.class);
LOGGER.info("**********RESPONSE************" + result.getStatusCode());
}
}
在这里,它向这个 REST API 发送了两次数据。该批处理需要每 10 分钟执行一次。我编写了一个每 10 分钟运行一次该作业的调度程序。以下是显示它正在执行两次的日志。
2021-05-14 20:50:00.070 INFO 84100 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [fetchDataStep]
2021-05-14 20:50:32.833 INFO 84100 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [fetchDataStep] executed in 32s763ms
2021-05-14 20:50:32.837 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] completed with the following parameters: [{JobID=1621005600052}] and the following status: [COMPLETED] in 32s775ms
2021-05-14 20:50:33.017 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] launched with the following parameters: [{JobID=1621005633015}]
2021-05-14 20:50:33.021 INFO 84100 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [fetchDataStep]
2021-05-14 20:51:02.712 INFO 84100 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [fetchDataStep] executed in 29s691ms
2021-05-14 20:51:02.716 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] completed with the following parameters: [{JobID=1621005633015}] and the following status: [COMPLETED] in 29s696ms
2021-05-14 21:00:00.061 INFO 84100 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=fetchDatatJob]] launched with the following parameters: [{JobID=1621006200051}]
调度器代码如下-
@Component
public class ScheduleBatchJobs {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@Scheduled(cron = "* */10 * * * ?")
public void perform() throws Exception
{
JobParameters params = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
jobLauncher.run(job, params);
}
}
谁能支持我解决这个问题。
您的 spring-批处理配置看起来不错。问题出在 cron 表达式上。
这个 cron 表达式 cron = "* */10 * * * ?"
将每 10 分钟执行一次,并且每次线程在一分钟内完成作业时都会触发该方法。例如:
@Scheduled(cron = "* */5 * * * ?")
public void print() throws InterruptedException {
Thread.sleep(10000);
System.out.println("Hello: "+ Instant.now());
}
在这种情况下,print() 将每五分钟执行一次。但是,该线程在 10 秒内完成,因此 10 秒后它将再次触发 print() 并且此 print() 将继续被调用一分钟。以下是日志:
Hello: 2021-05-15T15:25:10.013Z
Hello: 2021-05-15T15:25:21.005Z
Hello: 2021-05-15T15:25:32.001Z
Hello: 2021-05-15T15:25:43.002Z
Hello: 2021-05-15T15:25:54.005Z
Hello: 2021-05-15T15:30:10.009Z
Hello: 2021-05-15T15:30:21.001Z
Hello: 2021-05-15T15:30:32.002Z
Hello: 2021-05-15T15:30:43.001Z
Hello: 2021-05-15T15:30:54.005Z
Hello: 2021-05-15T15:31:05.002Z
在您的情况下,作业在 30 秒内完成,因此 cron 表达式会再次触发 perform(),就像在 1 分钟的时间范围内一样。 请将您的 cron 表达式更改为
@Scheduled(cron = "0 0/10 * * * ?")
即使作业在 1 分钟内完成,这也会每 10 分钟触发一次 perform()。