Spring异常终止后批量重启持久作业
Spring Batch restart persistent jobs after abnormal termination
我有以下 Spring 批处理作业配置:
@Configuration
@EnableBatchProcessing
public class JobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.flow(stepA()).on("FAILED").to(stepC())
.from(stepA()).on("*").to(stepB()).next(stepC())
.end().build();
}
@Bean
public Step stepA() {
return stepBuilderFactory.get("stepA").tasklet(new RandomFailTasket("stepA")).build();
}
@Bean
public Step stepB() {
return stepBuilderFactory.get("stepB").tasklet(new PrintTextTasklet("stepB")).build();
}
@Bean
public Step stepC() {
return stepBuilderFactory.get("stepC").tasklet(new PrintTextTasklet("stepC")).build();
}
}
我将使用以下代码开始工作:
try {
Map<String,JobParameter> parameters = new HashMap<>();
JobParameter ccReportIdParameter = new JobParameter("03061980");
parameters.put("ccReportId", ccReportIdParameter);
jobLauncher.run(job, new JobParameters(parameters));
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
这是我的测试任务:
public class PrintTextTasklet implements Tasklet {
public PrintTextTasklet() {
}
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
String ccReportId = chunkContext.getStepContext().getStepExecution().getJobParameters().getString("ccReportId");
System.out.println("ccReportId: " + ccReportId);
Thread.sleep(60 * 1000);
return RepeatStatus.FINISHED;
}
}
我使用 H2 数据库作为我工作的持久存储。
在作业执行期间,我正在终止我的应用程序。在应用程序重新启动后,我希望所有未完成的作业将从终止的步骤继续执行,但没有任何反应。
此外,在我的 application.properties 文件中,我添加了以下 属性:
spring.batch.job.enabled=false
因为我不想让 Spring 批处理自动启动新的(未终止的)作业。我需要手动启动所有新作业(根据用户请求)并在下一次申请后重新启动所有已完成的作业 运行.
在这种情况下如何配置 Spring 批处理?
已更新
现在我正在尝试使用以下方法重新启动作业:
public void restartUncompletedJobs() {
List<String> jobs = jobExplorer.getJobNames();
for (String job : jobs) {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
for (JobExecution runningJob : runningJobs) {
try {
jobOperator.restart(runningJob.getId());
logger.info("Restarted: " + runningJob);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
但失败并出现以下异常:
org.springframework.batch.core.launch.NoSuchJobException: No job configuration with the name [job] was registered
at org.springframework.batch.core.configuration.support.MapJobRegistry.getJob(MapJobRegistry.java:66) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at com.sun.proxy.$Proxy94.getJob(Unknown Source) ~[na:na]
at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:275) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at com.sun.proxy.$Proxy96.restart(Unknown Source) ~[na:na]
at com.example.domain.api.batch.job.ReportJobServiceImpl.restartUncompletedJobs(ReportJobServiceImpl.java:72) ~[classes/:na]
at com.example.domain.api.Application.lambda[=16=](Application.java:46) [classes/:na]
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:672) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:690) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:321) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:957) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:946) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at com.example.domain.api.Application.main(Application.java:53) [classes/:na]
我通过添加以下行使其工作
jobRegistry.register(new ReferenceJobFactory(job));
进入restartUncompletedJobs
方法
public void restartUncompletedJobs() {
try {
jobRegistry.register(new ReferenceJobFactory(job));
List<String> jobs = jobExplorer.getJobNames();
for (String job : jobs) {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
for (JobExecution runningJob : runningJobs) {
runningJob.setStatus(BatchStatus.FAILED);
runningJob.setEndTime(new Date());
jobRepository.update(runningJob);
jobOperator.restart(runningJob.getId());
logger.info("Restarted: " + runningJob);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
我有以下 Spring 批处理作业配置:
@Configuration
@EnableBatchProcessing
public class JobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.flow(stepA()).on("FAILED").to(stepC())
.from(stepA()).on("*").to(stepB()).next(stepC())
.end().build();
}
@Bean
public Step stepA() {
return stepBuilderFactory.get("stepA").tasklet(new RandomFailTasket("stepA")).build();
}
@Bean
public Step stepB() {
return stepBuilderFactory.get("stepB").tasklet(new PrintTextTasklet("stepB")).build();
}
@Bean
public Step stepC() {
return stepBuilderFactory.get("stepC").tasklet(new PrintTextTasklet("stepC")).build();
}
}
我将使用以下代码开始工作:
try {
Map<String,JobParameter> parameters = new HashMap<>();
JobParameter ccReportIdParameter = new JobParameter("03061980");
parameters.put("ccReportId", ccReportIdParameter);
jobLauncher.run(job, new JobParameters(parameters));
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
这是我的测试任务:
public class PrintTextTasklet implements Tasklet {
public PrintTextTasklet() {
}
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
String ccReportId = chunkContext.getStepContext().getStepExecution().getJobParameters().getString("ccReportId");
System.out.println("ccReportId: " + ccReportId);
Thread.sleep(60 * 1000);
return RepeatStatus.FINISHED;
}
}
我使用 H2 数据库作为我工作的持久存储。
在作业执行期间,我正在终止我的应用程序。在应用程序重新启动后,我希望所有未完成的作业将从终止的步骤继续执行,但没有任何反应。
此外,在我的 application.properties 文件中,我添加了以下 属性:
spring.batch.job.enabled=false
因为我不想让 Spring 批处理自动启动新的(未终止的)作业。我需要手动启动所有新作业(根据用户请求)并在下一次申请后重新启动所有已完成的作业 运行.
在这种情况下如何配置 Spring 批处理?
已更新
现在我正在尝试使用以下方法重新启动作业:
public void restartUncompletedJobs() {
List<String> jobs = jobExplorer.getJobNames();
for (String job : jobs) {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
for (JobExecution runningJob : runningJobs) {
try {
jobOperator.restart(runningJob.getId());
logger.info("Restarted: " + runningJob);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
但失败并出现以下异常:
org.springframework.batch.core.launch.NoSuchJobException: No job configuration with the name [job] was registered
at org.springframework.batch.core.configuration.support.MapJobRegistry.getJob(MapJobRegistry.java:66) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at com.sun.proxy.$Proxy94.getJob(Unknown Source) ~[na:na]
at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:275) ~[spring-batch-core-3.0.5.RELEASE.jar:3.0.5.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201) ~[spring-aop-4.2.0.RELEASE.jar:4.2.0.RELEASE]
at com.sun.proxy.$Proxy96.restart(Unknown Source) ~[na:na]
at com.example.domain.api.batch.job.ReportJobServiceImpl.restartUncompletedJobs(ReportJobServiceImpl.java:72) ~[classes/:na]
at com.example.domain.api.Application.lambda[=16=](Application.java:46) [classes/:na]
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:672) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:690) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:321) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:957) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:946) ~[spring-boot-1.2.5.RELEASE.jar:1.2.5.RELEASE]
at com.example.domain.api.Application.main(Application.java:53) [classes/:na]
我通过添加以下行使其工作
jobRegistry.register(new ReferenceJobFactory(job));
进入restartUncompletedJobs
方法
public void restartUncompletedJobs() {
try {
jobRegistry.register(new ReferenceJobFactory(job));
List<String> jobs = jobExplorer.getJobNames();
for (String job : jobs) {
Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);
for (JobExecution runningJob : runningJobs) {
runningJob.setStatus(BatchStatus.FAILED);
runningJob.setEndTime(new Date());
jobRepository.update(runningJob);
jobOperator.restart(runningJob.getId());
logger.info("Restarted: " + runningJob);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}