Spring 批处理 - 重新启动使用 TaskExecutor 的失败作业
Spring batch - restart FAILED job that uses TaskExecutor
重新启动 FAILED spring 使用 TaskExecutor 的批处理作业的正确方法是什么?
我有一个从 HTTP 加载数据的作业,有时会出现 500 错误 - 导致该作业失败)。我想重新启动这个作业,直到它成功。
如果我创建 JobExecutionListener 并在 afterJob() 方法中实现逻辑,我会收到错误消息,指出此作业实际上是 运行。如果我使用 Spring 中的 RetryTemplate,这也不起作用,因为这是 TaskExecutor 中的 运行。
任何代码示例都会有很大帮助。
最后我通过重新实现 JobLauncher 解决了这个问题:
public class FaultTolerantJobLauncher implements JobLauncher, InitializingBean {
protected static final Log logger = LogFactory.getLog(FaultTolerantJobLauncher.class);
private JobRepository jobRepository;
private RetryTemplate retryTemplate;
private TaskExecutor taskExecutor;
/**
* Run the provided job with the given {@link JobParameters}. The
* {@link JobParameters} will be used to determine if this is an execution
* of an existing job instance, or if a new one should be created.
*
* @param job the job to be run.
* @param jobParameters the {@link JobParameters} for this particular
* execution.
* @return JobExecutionAlreadyRunningException if the JobInstance already
* exists and has an execution already running.
* @throws JobRestartException if the execution would be a re-start, but a
* re-start is either not allowed or not needed.
* @throws JobInstanceAlreadyCompleteException if this instance has already
* completed successfully
* @throws JobParametersInvalidException
*/
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
final AtomicReference<JobExecution> executionReference = new AtomicReference<>();
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/*
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
* retrieve the previous execution and check
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
// Check the validity of the parameters before doing creating anything
// in the repository...
job.getJobParametersValidator().validate(jobParameters);
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
retryTemplate.execute(new FaultTolerantJobRetryCallback(executionReference, job, jobParameters));
} catch (TaskRejectedException e) {
executionReference.get().upgradeStatus(BatchStatus.FAILED);
if (executionReference.get().getExitStatus().equals(ExitStatus.UNKNOWN)) {
executionReference.get().setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(executionReference.get());
}
}
});
return executionReference.get();
}
/**
* Set the JobRepsitory.
*
* @param jobRepository
*/
public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
/**
* Set the retryTemplate
*
* @param retryTemplate
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
/**
* Set the TaskExecutor. (Optional)
*
* @param taskExecutor
*/
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Ensure the required dependencies of a {@link JobRepository} have been
* set.
*/
@Override
public void afterPropertiesSet() throws Exception {
Assert.state(jobRepository != null, "A JobRepository has not been set.");
Assert.state(retryTemplate != null, "A RetryTemplate has not been set.");
if (taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
taskExecutor = new SyncTaskExecutor();
}
}
private class FaultTolerantJobRetryCallback implements RetryCallback<Object, RuntimeException> {
private final AtomicReference<JobExecution> executionReference;
private final Job job;
private final JobParameters jobParameters;
FaultTolerantJobRetryCallback(AtomicReference<JobExecution> executionReference, Job job, JobParameters jobParameters){
this.executionReference = executionReference;
this.job = job;
this.jobParameters = jobParameters;
}
@Override
public Object doWithRetry(RetryContext retryContext) {
if(!job.isRestartable()){
//will be set only once and in case that job can not be restarted we don't retry
retryContext.setExhaustedOnly();
}
if(retryContext.getRetryCount() > 0){
logger.info("Job: [" + job + "] retrying/restarting with the following parameters: [" + jobParameters
+ "]");
}
try {
/*
* There is a very small probability that a non-restartable job can be
* restarted, but only if another process or thread manages to launch
* <i>and</i> fail a job execution for this instance between the last
* assertion and the next method returning successfully.
*/
executionReference.set(jobRepository.createJobExecution(job.getName(), jobParameters));
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
job.execute(executionReference.get());
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + executionReference.get().getStatus() + "]");
}
catch (JobInstanceAlreadyCompleteException | JobExecutionAlreadyRunningException e){
retryContext.setExhaustedOnly(); //don't repeat if instance already complete or running
rethrow(e);
}
catch (Throwable t) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
rethrow(t);
}
if(job.isRestartable() && executionReference.get().getStatus() == BatchStatus.FAILED){
//if job failed and can be restarted, use retry template to restart the job
throw new TaskRejectedException("RetryTemplate failed too many times");
}
return null;
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
}
}
重新启动 FAILED spring 使用 TaskExecutor 的批处理作业的正确方法是什么?
我有一个从 HTTP 加载数据的作业,有时会出现 500 错误 - 导致该作业失败)。我想重新启动这个作业,直到它成功。
如果我创建 JobExecutionListener 并在 afterJob() 方法中实现逻辑,我会收到错误消息,指出此作业实际上是 运行。如果我使用 Spring 中的 RetryTemplate,这也不起作用,因为这是 TaskExecutor 中的 运行。
任何代码示例都会有很大帮助。
最后我通过重新实现 JobLauncher 解决了这个问题:
public class FaultTolerantJobLauncher implements JobLauncher, InitializingBean {
protected static final Log logger = LogFactory.getLog(FaultTolerantJobLauncher.class);
private JobRepository jobRepository;
private RetryTemplate retryTemplate;
private TaskExecutor taskExecutor;
/**
* Run the provided job with the given {@link JobParameters}. The
* {@link JobParameters} will be used to determine if this is an execution
* of an existing job instance, or if a new one should be created.
*
* @param job the job to be run.
* @param jobParameters the {@link JobParameters} for this particular
* execution.
* @return JobExecutionAlreadyRunningException if the JobInstance already
* exists and has an execution already running.
* @throws JobRestartException if the execution would be a re-start, but a
* re-start is either not allowed or not needed.
* @throws JobInstanceAlreadyCompleteException if this instance has already
* completed successfully
* @throws JobParametersInvalidException
*/
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
final AtomicReference<JobExecution> executionReference = new AtomicReference<>();
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/*
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
* retrieve the previous execution and check
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
// Check the validity of the parameters before doing creating anything
// in the repository...
job.getJobParametersValidator().validate(jobParameters);
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
retryTemplate.execute(new FaultTolerantJobRetryCallback(executionReference, job, jobParameters));
} catch (TaskRejectedException e) {
executionReference.get().upgradeStatus(BatchStatus.FAILED);
if (executionReference.get().getExitStatus().equals(ExitStatus.UNKNOWN)) {
executionReference.get().setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(executionReference.get());
}
}
});
return executionReference.get();
}
/**
* Set the JobRepsitory.
*
* @param jobRepository
*/
public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
/**
* Set the retryTemplate
*
* @param retryTemplate
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
/**
* Set the TaskExecutor. (Optional)
*
* @param taskExecutor
*/
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Ensure the required dependencies of a {@link JobRepository} have been
* set.
*/
@Override
public void afterPropertiesSet() throws Exception {
Assert.state(jobRepository != null, "A JobRepository has not been set.");
Assert.state(retryTemplate != null, "A RetryTemplate has not been set.");
if (taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
taskExecutor = new SyncTaskExecutor();
}
}
private class FaultTolerantJobRetryCallback implements RetryCallback<Object, RuntimeException> {
private final AtomicReference<JobExecution> executionReference;
private final Job job;
private final JobParameters jobParameters;
FaultTolerantJobRetryCallback(AtomicReference<JobExecution> executionReference, Job job, JobParameters jobParameters){
this.executionReference = executionReference;
this.job = job;
this.jobParameters = jobParameters;
}
@Override
public Object doWithRetry(RetryContext retryContext) {
if(!job.isRestartable()){
//will be set only once and in case that job can not be restarted we don't retry
retryContext.setExhaustedOnly();
}
if(retryContext.getRetryCount() > 0){
logger.info("Job: [" + job + "] retrying/restarting with the following parameters: [" + jobParameters
+ "]");
}
try {
/*
* There is a very small probability that a non-restartable job can be
* restarted, but only if another process or thread manages to launch
* <i>and</i> fail a job execution for this instance between the last
* assertion and the next method returning successfully.
*/
executionReference.set(jobRepository.createJobExecution(job.getName(), jobParameters));
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
job.execute(executionReference.get());
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + executionReference.get().getStatus() + "]");
}
catch (JobInstanceAlreadyCompleteException | JobExecutionAlreadyRunningException e){
retryContext.setExhaustedOnly(); //don't repeat if instance already complete or running
rethrow(e);
}
catch (Throwable t) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
rethrow(t);
}
if(job.isRestartable() && executionReference.get().getStatus() == BatchStatus.FAILED){
//if job failed and can be restarted, use retry template to restart the job
throw new TaskRejectedException("RetryTemplate failed too many times");
}
return null;
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
}
}