Spring 批量执行作业两次

Spring Batch executing job twice

我使用 Spring Boot 创建了一个简单的批处理应用程序,它从数据库中读取数据并将获取的项目作为列表发送到 REST API。一切正常,但它两次将数据发送到 REST API。我检查了其他帖子的答案。试过 spring.batch.job.enabled=false.

批量配置class如下

@Configuration
@EnableBatchProcessing
public class JobBatchConfig extends DefaultBatchConfigurer {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Override
    public void setDataSource(DataSource dataSource) {
        // override to do not set datasource even if a datasource exist.
        // initialize will use a Map based JobRepository (instead of database)
    }

    @Bean
    //@StepScope
    public ItemReader<ModelClass> modelItemReader() {
        JdbcPagingItemReader<ModelClass> reader = new JdbcPagingItemReader<ModelClass>();
        final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean =
                new SqlPagingQueryProviderFactoryBean();
        sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
        sqlPagingQueryProviderFactoryBean.setSelectClause(SQLQueries.SELECT_QUERY);
        sqlPagingQueryProviderFactoryBean.setFromClause(SQLQueries.FROM_CLAUSE);
        sqlPagingQueryProviderFactoryBean.setSortKey(SQLQueries.SORT_KEY);
        sqlPagingQueryProviderFactoryBean.setWhereClause(SQLQueries.WHERE_CONDITION);
        try {
            reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
        reader.setDataSource(dataSource);
        reader.setPageSize(100);
        reader.setRowMapper(new ModelClassRowMapper());
        return reader;
    }

    @Bean
    public ModelClassItemWriter modelClassItemWriter(){
        return new ModelClassItemWriter();
    }

    @Bean
    public Step fetchDataStep() {
        return stepBuilderFactory
                .get("fetchDataStep")
                .<ModelClass, ModelClass> chunk(50)
                .reader(modelItemReader())
                .writer(modelClassItemWriter())
                .build();
    }

    @Bean
    public Job fetchDataJob() {
        return jobBuilderFactory.get("fetchDatatJob")
                .start(fetchDataStep())
                .build();
    }
}

作者class如下-

public class ModelClassItemWriter implements ItemWriter<ModelClass> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ModelClassItemWriter .class);

    @Value("${rest.uri}")
    private String baseUrl;

    @Override
    public void write(List<? extends ModelClass> items) throws Exception {

        RestTemplate restTemplate = new RestTemplate();
        URI uri = new URI(baseUrl);
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);

        HttpEntity<List<? extends ModelClass>> request =
                new HttpEntity<List<? extends ModelClass>>(items, headers);
        ResponseEntity<String> result = restTemplate.postForEntity(uri, request, String.class);
        LOGGER.info("**********RESPONSE************" + result.getStatusCode());
    }
}

在这里,它向这个 REST API 发送了两次数据。该批处理需要每 10 分钟执行一次。我编写了一个每 10 分钟运行一次该作业的调度程序。以下是显示它正在执行两次的日志。

2021-05-14 20:50:00.070  INFO 84100 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [fetchDataStep]
2021-05-14 20:50:32.833  INFO 84100 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [fetchDataStep] executed in 32s763ms
2021-05-14 20:50:32.837  INFO 84100 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=fetchDatatJob]] completed with the following parameters: [{JobID=1621005600052}] and the following status: [COMPLETED] in 32s775ms
2021-05-14 20:50:33.017  INFO 84100 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=fetchDatatJob]] launched with the following parameters: [{JobID=1621005633015}]
2021-05-14 20:50:33.021  INFO 84100 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [fetchDataStep]
2021-05-14 20:51:02.712  INFO 84100 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [fetchDataStep] executed in 29s691ms
2021-05-14 20:51:02.716  INFO 84100 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=fetchDatatJob]] completed with the following parameters: [{JobID=1621005633015}] and the following status: [COMPLETED] in 29s696ms
2021-05-14 21:00:00.061  INFO 84100 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=fetchDatatJob]] launched with the following parameters: [{JobID=1621006200051}]

调度器代码如下-

@Component
public class ScheduleBatchJobs {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @Scheduled(cron = "* */10 * * * ?")
    public void perform() throws Exception
    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();

        jobLauncher.run(job, params);
    }
}

谁能支持我解决这个问题。

您的 spring-批处理配置看起来不错。问题出在 cron 表达式上。

这个 cron 表达式 cron = "* */10 * * * ?" 将每 10 分钟执行一次,并且每次线程在一分钟内完成作业时都会触发该方法。例如:

@Scheduled(cron = "* */5 * * * ?")
    public void print() throws InterruptedException {
        Thread.sleep(10000);
        System.out.println("Hello: "+ Instant.now());
    }

在这种情况下,print() 将每五分钟执行一次。但是,该线程在 10 秒内完成,因此 10 秒后它将再次触发 print() 并且此 print() 将继续被调用一分钟。以下是日志:

Hello: 2021-05-15T15:25:10.013Z
Hello: 2021-05-15T15:25:21.005Z
Hello: 2021-05-15T15:25:32.001Z
Hello: 2021-05-15T15:25:43.002Z
Hello: 2021-05-15T15:25:54.005Z

Hello: 2021-05-15T15:30:10.009Z
Hello: 2021-05-15T15:30:21.001Z
Hello: 2021-05-15T15:30:32.002Z
Hello: 2021-05-15T15:30:43.001Z
Hello: 2021-05-15T15:30:54.005Z
Hello: 2021-05-15T15:31:05.002Z

在您的情况下,作业在 30 秒内完成,因此 cron 表达式会再次触发 perform(),就像在 1 分钟的时间范围内一样。 请将您的 cron 表达式更改为

@Scheduled(cron = "0 0/10 * * * ?")

即使作业在 1 分钟内完成,这也会每 10 分钟触发一次 perform()。