Spring 在集群环境中批量正确重启未完成的作业
Spring Batch correctly restart uncompleted jobs in clustered environment
我使用以下逻辑在单节点上重启未完成的作业Spring批处理应用程序:
public void restartUncompletedJobs() {
try {
jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));
List<String> jobs = jobExplorer.getJobNames();
for (String job : jobs) {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
for (JobExecution runningJob : runningJobs) {
runningJob.setStatus(BatchStatus.FAILED);
runningJob.setEndTime(new Date());
jobRepository.update(runningJob);
jobOperator.restart(runningJob.getId());
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
现在我正在尝试让它在双节点集群上运行。每个节点上的两个应用程序都将指向共享的 PostgreSQL 数据库。
让我们考虑以下示例:我有 2 个作业实例 - jobInstance1
现在是 运行 node1
,jobInstance2
是 运行在 node2
上。 Node1
在 jobInstance1
执行期间由于某种原因重新启动。在 node1
重新启动后,spring 批处理应用程序尝试使用上述逻辑重新启动未完成的作业 - 它看到有 2 个未完成的作业实例 - jobInstance1
和 jobInstance2
(其中在 node2
上正确 运行) 并尝试重新启动它们。这种方式而不是重新启动唯一的 jobInstance1
- 它会重新启动 jobInstance1
和 jobInstance2
.. 但是 jobInstance2
不应该重新启动,因为它现在正在 node2
.
如何在应用程序启动期间正确重启未完成的作业(在上一个应用程序终止之前)并防止像 jobInstance2
这样的作业也将被重新启动的情况?
已更新
这是下面答案中提供的解决方案:
Get the job instances of your job with JobOperator#getJobInstances
For each instance, check if there is a running execution using JobOperator#getExecutions.
2.1 If there is a running execution, move to next instance (in order to let the execution finish either successfully or with a failure)
2.2 If there is no currently running execution, check the status of the last execution and restart it if failed using JobOperator#restart.
我有一个关于#2.1 的问题 - Spring 批处理会在应用程序重新启动后自动重新启动未完成的作业并执行 运行 还是我需要手动操作才能这样做?
您的逻辑不是重新启动未完成的作业。您的逻辑当前正在执行 运行 个作业,将它们的状态设置为 FAILED
并重新启动它们。您的逻辑不应找到 运行 次执行,它应该查找 not 当前 运行 次执行,尤其是失败的执行并重新启动他们。
How to correctly restart the failed jobs and prevent the situation when the jobs like jobInstance2 will be also restarted?
在伪代码中,你需要做的是:
- 使用
JobOperator#getJobInstances
获取作业的作业实例
对于每个实例,使用JobOperator#getExecutions
检查是否有运行执行。
2.1 如果有 运行 执行,移动到下一个实例(为了让执行成功或失败完成)
2.2 如果当前没有运行执行,检查上次执行的状态,如果失败使用JobOperator#restart
.
重新启动
在您的场景中:
jobInstance1
应在步骤 2.2 中重新启动
jobInstance2
应该在步骤 2.1 中过滤,因为在节点 2 上有一个 运行 执行。
我使用以下逻辑在单节点上重启未完成的作业Spring批处理应用程序:
public void restartUncompletedJobs() {
try {
jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));
List<String> jobs = jobExplorer.getJobNames();
for (String job : jobs) {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
for (JobExecution runningJob : runningJobs) {
runningJob.setStatus(BatchStatus.FAILED);
runningJob.setEndTime(new Date());
jobRepository.update(runningJob);
jobOperator.restart(runningJob.getId());
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
现在我正在尝试让它在双节点集群上运行。每个节点上的两个应用程序都将指向共享的 PostgreSQL 数据库。
让我们考虑以下示例:我有 2 个作业实例 - jobInstance1
现在是 运行 node1
,jobInstance2
是 运行在 node2
上。 Node1
在 jobInstance1
执行期间由于某种原因重新启动。在 node1
重新启动后,spring 批处理应用程序尝试使用上述逻辑重新启动未完成的作业 - 它看到有 2 个未完成的作业实例 - jobInstance1
和 jobInstance2
(其中在 node2
上正确 运行) 并尝试重新启动它们。这种方式而不是重新启动唯一的 jobInstance1
- 它会重新启动 jobInstance1
和 jobInstance2
.. 但是 jobInstance2
不应该重新启动,因为它现在正在 node2
.
如何在应用程序启动期间正确重启未完成的作业(在上一个应用程序终止之前)并防止像 jobInstance2
这样的作业也将被重新启动的情况?
已更新
这是下面答案中提供的解决方案:
Get the job instances of your job with JobOperator#getJobInstances
For each instance, check if there is a running execution using JobOperator#getExecutions.
2.1 If there is a running execution, move to next instance (in order to let the execution finish either successfully or with a failure)
2.2 If there is no currently running execution, check the status of the last execution and restart it if failed using JobOperator#restart.
我有一个关于#2.1 的问题 - Spring 批处理会在应用程序重新启动后自动重新启动未完成的作业并执行 运行 还是我需要手动操作才能这样做?
您的逻辑不是重新启动未完成的作业。您的逻辑当前正在执行 运行 个作业,将它们的状态设置为 FAILED
并重新启动它们。您的逻辑不应找到 运行 次执行,它应该查找 not 当前 运行 次执行,尤其是失败的执行并重新启动他们。
How to correctly restart the failed jobs and prevent the situation when the jobs like jobInstance2 will be also restarted?
在伪代码中,你需要做的是:
- 使用
JobOperator#getJobInstances
获取作业的作业实例
对于每个实例,使用
JobOperator#getExecutions
检查是否有运行执行。2.1 如果有 运行 执行,移动到下一个实例(为了让执行成功或失败完成)
2.2 如果当前没有运行执行,检查上次执行的状态,如果失败使用
JobOperator#restart
. 重新启动
在您的场景中:
jobInstance1
应在步骤 2.2 中重新启动jobInstance2
应该在步骤 2.1 中过滤,因为在节点 2 上有一个 运行 执行。