spring 批处理调度期间出现的错误
bugs accuring during spring batch scheduling
大家好,我正在处理 spring 带调度的批处理(使用 cron 触发器)它正在工作,但存在以下错误:
- 让我们假设 cron 值每 10 秒启动一次批处理,当我启动第一个和之后,例如 3 秒我启动另一个,spring 将不会意识到 3 的间隔秒,它会同时启动它们,就像我同时触发它们一样
这是我的代码
this is the class of the job i'll launch
@Component
public class JobThread implements Runnable {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Lazy
private Job job;
public JobParameters jobParameters;
private Logger log = Logger.getLogger(JobThread.class);
public synchronized void runBatch() {
jobParameters = new JobParametersBuilder().addLong("LaunchTime", System.currentTimeMillis())
.addString("TenantID", BatchController.getCurrentTenant().get()).toJobParameters();
try {
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.info("Job's Status:::" + jobExecution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
@Override
public void run() {
this.runBatch();
}
}
the controller which will invoke the job
@RestController
@RequestMapping("tenant/batch")
public class BatchController {
@Autowired
private ThreadPoolTaskScheduler taskScheduler;
@Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private JobThread jobThread;
private static ThreadLocal<String> currentTenant;
@PostMapping("/schedule")
public void setBatch(@RequestBody BatchBean cron) {
currentTenant = new ThreadLocal<String>() {
@Override
protected String initialValue() {
new TenantContext();
return TenantContext.getCurrentTenant();
}
};
//cron = "*/10 * * * * *";
taskScheduler.schedule(taskExecutor.createThread(jobThread), new CronTrigger(cron.getCron()));
}
我希望我已经足够清楚了
提前致谢
问题是您的代码不是线程安全的,因此存在潜在危险。此外,您的 ThreadLocal
不会工作,因为该作业将在不同的线程中执行,并且无法访问 ThreadLocal
。
- 不要在控制器中重新创建
ThreadLocal
。定义一次,就这样吧。
- 你的
JobThread
是一个保持状态(参数)的单例,所以只有最后一个会保留。
- 接口编程
TaskScheduler
而不是具体实现
- 不要创建话题,因为您的
JobThread
已经 Runnable
。
- 不用你的
JobThread
做单例,根据需要构造一个新的,传入需要的参数。
您的 JobThread
应该看起来像这样。
public class JobThread implements Runnable {
private final Logger log = Logger.getLogger(JobThread.class);
private final JobLauncher jobLauncher;
private final Job job;
private final String tenant;
public JobThread(JobLauncher launcher, Job job, String tenant) {
this.jobLauncher=launcher;
this.job=job;
this.tenant=tenant;
}
@Override
public void run() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("LaunchTime", System.currentTimeMillis())
.addString("TenantID", tenant);
try {
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.info("Job's Status:::" + jobExecution.getStatus());
} catch (JobExecutionException e) {
log.error(e.getMessage(), e);
}
}
}
然后在您的控制器中注入所需的 JobLauncer
和 Job
。需要时构造一个新的JobThread
并传入需要的信息。
@RestController
@RequestMapping("tenant/batch")
public class BatchController {
@Autowired
private TaskScheduler taskScheduler;
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Lazy
private Job job;
@PostMapping("/schedule")
public void setBatch(@RequestBody BatchBean cron) {
//cron = "*/10 * * * * *";
String tenant = TenantContext.getCurrentTenant();
JobThread task = new JobThread(this.jobLauncher, this.job, tenant);
taskScheduler.schedule(task, new CronTrigger(cron.getCron()));
}
最后一点,System.currentTimeMillis
的精度可能与您的 OS/System/Architecture 不同。参见所述方法的javadoc。
大家好,我正在处理 spring 带调度的批处理(使用 cron 触发器)它正在工作,但存在以下错误:
- 让我们假设 cron 值每 10 秒启动一次批处理,当我启动第一个和之后,例如 3 秒我启动另一个,spring 将不会意识到 3 的间隔秒,它会同时启动它们,就像我同时触发它们一样
这是我的代码
this is the class of the job i'll launch
@Component
public class JobThread implements Runnable {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Lazy
private Job job;
public JobParameters jobParameters;
private Logger log = Logger.getLogger(JobThread.class);
public synchronized void runBatch() {
jobParameters = new JobParametersBuilder().addLong("LaunchTime", System.currentTimeMillis())
.addString("TenantID", BatchController.getCurrentTenant().get()).toJobParameters();
try {
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.info("Job's Status:::" + jobExecution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
@Override
public void run() {
this.runBatch();
}
}
the controller which will invoke the job
@RestController
@RequestMapping("tenant/batch")
public class BatchController {
@Autowired
private ThreadPoolTaskScheduler taskScheduler;
@Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private JobThread jobThread;
private static ThreadLocal<String> currentTenant;
@PostMapping("/schedule")
public void setBatch(@RequestBody BatchBean cron) {
currentTenant = new ThreadLocal<String>() {
@Override
protected String initialValue() {
new TenantContext();
return TenantContext.getCurrentTenant();
}
};
//cron = "*/10 * * * * *";
taskScheduler.schedule(taskExecutor.createThread(jobThread), new CronTrigger(cron.getCron()));
}
我希望我已经足够清楚了 提前致谢
问题是您的代码不是线程安全的,因此存在潜在危险。此外,您的 ThreadLocal
不会工作,因为该作业将在不同的线程中执行,并且无法访问 ThreadLocal
。
- 不要在控制器中重新创建
ThreadLocal
。定义一次,就这样吧。 - 你的
JobThread
是一个保持状态(参数)的单例,所以只有最后一个会保留。 - 接口编程
TaskScheduler
而不是具体实现 - 不要创建话题,因为您的
JobThread
已经Runnable
。 - 不用你的
JobThread
做单例,根据需要构造一个新的,传入需要的参数。
您的 JobThread
应该看起来像这样。
public class JobThread implements Runnable {
private final Logger log = Logger.getLogger(JobThread.class);
private final JobLauncher jobLauncher;
private final Job job;
private final String tenant;
public JobThread(JobLauncher launcher, Job job, String tenant) {
this.jobLauncher=launcher;
this.job=job;
this.tenant=tenant;
}
@Override
public void run() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("LaunchTime", System.currentTimeMillis())
.addString("TenantID", tenant);
try {
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.info("Job's Status:::" + jobExecution.getStatus());
} catch (JobExecutionException e) {
log.error(e.getMessage(), e);
}
}
}
然后在您的控制器中注入所需的 JobLauncer
和 Job
。需要时构造一个新的JobThread
并传入需要的信息。
@RestController
@RequestMapping("tenant/batch")
public class BatchController {
@Autowired
private TaskScheduler taskScheduler;
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Lazy
private Job job;
@PostMapping("/schedule")
public void setBatch(@RequestBody BatchBean cron) {
//cron = "*/10 * * * * *";
String tenant = TenantContext.getCurrentTenant();
JobThread task = new JobThread(this.jobLauncher, this.job, tenant);
taskScheduler.schedule(task, new CronTrigger(cron.getCron()));
}
最后一点,System.currentTimeMillis
的精度可能与您的 OS/System/Architecture 不同。参见所述方法的javadoc。