重复 tasklet 步骤直到特定条件

Repeat tasklet step until certain condition

我使用 Spring 批处理来构建 ETL 作业。我的主要工作只是从一个数据库中读取并写入另一个数据库。在我的主要工作之前,我需要检查源数据库中的状态以查看它是否准备就绪。仅当源数据库准备就绪时,我才会继续执行主要工作。我将检查状态逻辑实现为一个 Tasklet,并使用简单的逻辑构建我的工作,如果检查状态步骤失败,请重复此步骤直到该步骤成功,然后继续执行主要工作。我按如下方式构建作业:

@Bean
public Job myJob(MyListener listener) {

    return jobBuilderFactory.get(jobName)
                            .incrementer(new RunIdIncrementer())
                            .listener(listener)
                            .start(checkStatusStep())
                            .on("*").to(mainStep())
                            .on(ExitStatus.FAILED.getExitCode())
                            .to(checkStatusStep())
                            .end()
                            .build();
}

检查状态步骤是一个tasklet如下:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws InterruptedException {
    //isReady method check the db to see if it is ready
    if (isReady()) {
        //contribution.setExitStatus(ExitStatus.COMPLETED);
        return RepeatStatus.FINISHED;
    } else {
        //contribution.setExitStatus(ExitStatus.FAILED);
        return RepeatStatus.CONTINUABLE;
    }
}

我阅读了spring批处理文档,了解到条件跟随中的ON方法将检查步骤的退出状态,因此我在StepContribution中设置了退出状态。让我感到困惑的是,当我注释掉这两行时,代码仍然有效。

所以我的问题是,首先,如果条件流程检查退出状态,为什么我的代码在没有显式更改退出状态的情况下工作?第二,为什么tasklet return 是repeat status,这个repeat status 被谁消费了。 三、有没有更好的方法来实现我的目标?

if the conditional flow checks the exit status why my code works without explicitly change the exit status?

因为默认情况下,如果tasklet returns RepeatStatus.FINISHED,它的退出代码将被设置为COMPLETED

Second, why tasklet return a repeat status and by whom this repeat status is consumed.

一个 tasklet 被 TaskletStep 重复调用,直到它 returns RepeatStatus.FINISHED 或抛出一个异常来表示失败。对 Tasklet 的每次调用都包含在一个事务中。因此,该结构是循环内部具有事务边界的循环结构。

Third, is there a better way to achieve my goal?

IMO,您的 Tasklet 实施没有问题,您不需要退出状态。类似于:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws InterruptedException {
   //isReady method check the db to see if it is ready
   if (isReady()) {
    return RepeatStatus.FINISHED;
   } else {
    return RepeatStatus.CONTINUABLE;
   }
}

这将持续 运行 直到 isReady 为真,因此您无需在作业级别使用流程实现迭代。作业定义为:

@Bean
public Job myJob(MyListener listener) {

   return jobBuilderFactory.get(jobName)
                        .incrementer(new RunIdIncrementer())
                        .listener(listener)
                        .start(checkStatusStep())
                        .next(mainStep())
                        .build();
}

希望对您有所帮助。