使 Spring 4 批处理服务可按计划执行重复

Make Spring 4 Batch service repeatable on Scheduled execution

几天来我一直在尝试解决这个问题,但我做不到。我对 Spring 4 很陌生,所以也许有经验的人可以告诉我该怎么做。

我正在尝试学习如何使用最新版本的框架编写 Spring 批处理。我遵循了入门教程: https://spring.io/guides/gs/batch-processing/

作业执行后,我想让它定期执行。因此,我去了一个计划任务: https://spring.io/guides/gs/scheduling-tasks/

将这两个想法放在一起看起来很容易。实际上,这是不可能的。 我读了一点,然后添加了一些代码,使 jobExecution 每次都独一无二。尽管如此,编码步骤一旦完成,就再也不会执行了。

我进行了一些调查并阅读了有关 RepeatTemplate 的内容,但我不清楚如何使其适合我的代码。这里有 3 个相关的执行方法:

@Scheduled(cron="0 0/2 * * * ?")
public void run() throws Exception {
    System.out.println("Job Started at :" + new Date());

    JobParameters param = new JobParametersBuilder().addString("newsSyncJob",
            String.valueOf(System.currentTimeMillis())).toJobParameters();

    NewsJobCompletionListener listener = new NewsJobCompletionListener();
    JobExecution execution = jobLauncher.run(newsSyncJob(listener), param);

    System.out.println("Job finished with status :" + execution.getStatus());
}


/**
 * Execution of the job
 * @param listener
 * @return
 */
@Bean
public Job newsSyncJob(NewsJobCompletionListener listener) {
    log.debug("newsSyncJob execution started");
    this.init();
    return jobBuilderFactory.get("newsSyncJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1())
            .end()
            .build();
}

@Bean
protected Step step1() {
    return stepBuilderFactory.get("step1")
            .<NewsSync, NewsSync> chunk(4)
            .reader(filesReader()).
            processor(newsProcessor()).
            writer(stateWriter()).
            build();
}

任何想法如何使步骤重新执行成功? 提前致谢!

在M.Deinum的评论之后,我做了更多的调查,我可以自己解决。以下是一步一步的解释:

配置错误,因为 newsSyncJob() 应该只执行 一次 。它构建作业并且 bean 保持创建为单例。可以根据需要执行多次的单例。 步骤 bean 也会发生同样的情况。 step1() 只调用一次来创建单例 bean。作为作业的一部分,它可能会在作业执行时执行...只要配置正确。目前,这两种方法如下所示:

@Bean
public Job newsSyncJob() {
    log.debug("Building batch newsSyncJob...");
    this.init();
    return jobBuilderFactory.get("newsSyncJob")
            .incrementer(new RunIdIncrementer())
            .start(step1())
            .build();
}

@Bean
protected Step step1() {
    return stepBuilderFactory.get("step1")
            .<NewsSync, NewsSync> chunk(4)
            .reader(filesReader()).
            processor(newsProcessor()).
            writer(stateWriter()).
            build();
}

现在,要让步骤在我们每次 运行 作业 时都执行,需要更改其范围。该步骤由 itemReader + itemProcessor + itemWriter 组成。默认情况下,它们的范围是 Prototype。这意味着它会保持上一次执行的状态,不会再 运行 了。要再次使其成为 运行,需要将范围更改为 Step 范围。代码将如下所示:

@Bean
@StepScope
ItemReader<NewsSync> filesReader() {
    return new NewsSyncReader(srcPath);
}

@Bean
@StepScope
ItemProcessor<NewsSync, NewsSync> newsProcessor() {
    return new NewsSyncProcessor(jdbcTemplate, newsRepository);
}

@Bean
@StepScope
ItemWriter<NewsSync> stateWriter() {
    return new NewsSyncWriter(jdbcTemplate);
}

现在要完成,计划的调用将使用之前 class 中创建的 Beans 的依赖注入在不同的 class 中进行配置。像这样:

@EnableScheduling
@Service
public class BatchScheduled {

    private static final Logger log = LoggerFactory.getLogger(BatchScheduled.class);

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    @Qualifier("newsSyncJob")
    private Job newsJobSync;

    /**
     * Scheduled run of the batch
     * @throws Exception
     */
    @Scheduled(cron="0 0/2 * * * ?")
    public void run() throws Exception {
        log.info("newsJobSync started");
        System.out.println("Job Started at :" + new Date());

        JobParameters param = new JobParametersBuilder().addString("newsSyncJob",
                String.valueOf(System.currentTimeMillis())).toJobParameters();

        JobExecution execution = jobLauncher.run(newsJobSync, param);

        log.info("newsJobSync finished with status " + execution.getExitStatus());
    }
}

我们终于有了一个 Spring 使用预定表达式执行的批处理。

添加@Marc Loan 的答案,itemReader + itemProcessor + itemWriter。默认情况下,它们的范围是 Singleton 而不是 Prototype 或 StepScope,这使得它们的关联实例保留它们的最后状态。