Spring 强制终止后批处理不更新作业存储库

Spring Batch doesn't update Job repository after force termination

我正在使用 SpringBoot 2.4.x 应用和 SpringBatch 4.3.x。我创建了一个简单的工作。 我从 CSV 文件中读取 FlatFileItemReader 的地方。我有写入 Kafka 主题的 ImportKafkaItemWriter。我将这些结合起来的一个步骤。我正在使用 SimpleJobLauncher,并且我已将 ThreadPoolTask​​Executor 设置为 JobLauncher 的 TasKExecutor。正如我所料,它工作正常。但是我有一个弹性用例,如果我终止应用程序然后重新启动应用程序并触发作业,那么它将继续并完成剩余的作业。不幸的是,它没有发生。我做了进一步调查,发现当我强行关闭应用程序时 Spring批处理作业存储库密钥表如下所示:

job_execution_id 版本 job_instance_id create_time start_time end_time 状态 exit_code exit_message last_updated job_configuration_location
1 1 1 2021-06-1609:32:43 2021-06-1609:32:43 已开始 未知 2021-06-1609:32:43

step_execution_id 版本 step_name job_execution_id start_time end_time 状态 commit_count read_count filter_count write_count read_skip_count write_skip_count process_skip_count rollback_count exit_code exit_message last_updated
1 4 productImportStep 1 2021-06-1609:32:43 已开始 3 6 0 6 0 0 0 0 正在执行 2021-06-1609:32:50

如果我手动更新这些表,其中我设置了有效的 end_time 并且状态为失败,那么我可以重新启动作业并且工作得非常好。我可以知道我需要做什么,以便 Spring Batch 可以适当地更新那些相关的存储库,并且我可以避免这些手动步骤。如果需要,我可以提供有关代码的更多信息。

If I manually update these tables where I set a valid end_time and status to FAILED then I can restart the job and works absolutely fine. May I know what I need to do so that Spring Batch can update those relevant repositories appropriately and I can avoid this manual steps

当作业突然终止时,Spring 批处理将没有机会更新其在作业存储库中的状态,因此状态停留在 STARTED。现在,当作业重新启动时,Spring Batch 拥有的唯一信息是作业存储库中的状态。仅通过查看数据库中的状态,Spring Batch 无法区分有效 运行 的作业和突然终止的作业(在这两种情况下,状态均为 STARTED ).

进入的方式确实是手动更新表以将状态标记为 FAILED 以便能够重新启动作业或 ABANDONED 以放弃它。这是您必须做出的业务决策,并且无法在框架端自动执行。有关详细信息,请参阅此处的参考文档:Aborting a Job.

您可以添加一个伪造的参数示例 Version 计数器,以便在每次执行新作业时递增,这样您就不必检查 table 数据库作业。

我的意思是mvn clean package 然后你尝试像这样启动程序: java my-jarfile.jar dest=/tmp/foo Version="0" java my-jarfile.jar dest=/tmp/foo Version="1" java my-jarfile.jar dest=/tmp/foo Version="2" 等等...或者 您可以使用 jobParameters 通过 jobLauncher 以编程方式启动作业,并使用日期参数 date = new Date().toString() 在每个新作业执行时给出带有新戳的日期

您可以使用“JVM 关闭挂钩”: 像这样:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    if (jobExecution.isRunning()) {
        jobExecution.setEndTime(new Date());
        jobExecution.setStatus(BatchStatus.FAILED);
        jobExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(jobExecution);
    }
}));