如何在 spring 中使用 EnableScheduling 注释在运行时重新启动计划任务?
How to restart scheduled task on runtime with EnableScheduling annotation in spring?
我一直在研究如何使用 Java 8 和 spring 在运行时更改作业的频率。 This question 非常有用,但并没有完全解决我的问题。
我现在可以配置下一次执行作业的日期。但是如果设置延迟为1年,那么我需要等待1年才能考虑新配置。
我的想法是,如果更改了配置值(因此来自另一个 class),则停止计划任务。然后重新计算下一次应该执行任务的时间。也许有更简单的方法。
这是我目前的代码。
@Configuration
@EnableScheduling
public class RequestSchedulerConfig implements SchedulingConfigurer {
@Autowired
SchedulerConfigService schedulerConfigService;
@Bean
public RequestScheduler myBean() {
return new RequestScheduler();
}
@Bean(destroyMethod = "shutdown")
public Executor taskExecutor() {
return Executors.newScheduledThreadPool(100);
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
taskRegistrar.addTriggerTask(
new Runnable() {
@Override public void run() {
myBean().startReplenishmentComputation();
}
},
new Trigger() {
@Override public Date nextExecutionTime(TriggerContext triggerContext) {
Duration d = schedulerConfigService.getIntervalFromDB();
return DateTime.now().plus(d).toDate();
}
}
);
}
}
这就是我想做的。
@RestController
@RequestMapping("/api/config/scheduler")
public class RequestSchedulerController {
@Autowired
ApplicationConfigWrapper applicationConfigWrapper;
@RequestMapping("/set/")
@ResponseBody
public String setRequestSchedulerConfig(@RequestParam(value = "frequency", defaultValue = "") final String frequencyInSeconds){
changeValueInDb(frequencyInSeconds);
myJob.restart();
return "Yeah";
}
}
- 创建一个被注入的单例 bean
TaskScheduler
. This will hold as state variables all ScheduledFuture
s,如 private ScheduledFuture job1;
- 部署时,从数据库加载所有计划数据并启动作业,填写所有状态变量,如
job1
。
- 调度数据改变时,cancel相应的
Future
(例如job1
),然后用新的调度数据重新启动。
这里的关键思想是在创建 Future
时控制它们,以便将它们保存在一些状态变量中,这样当调度数据发生变化时,您可以取消它们。
这是工作代码:
applicationContext.xml
<task:annotation-driven />
<task:scheduler id="infScheduler" pool-size="10"/>
持有 Future
s
的单例 bean
@Component
public class SchedulerServiceImpl implements SchedulerService {
private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
@Autowired
@Qualifier(value="infScheduler")
private TaskScheduler taskScheduler;
@Autowired
private MyService myService;
private ScheduledFuture job1;//for other jobs you can add new private state variables
//Call this on deployment from the ScheduleDataRepository and everytime when schedule data changes.
@Override
public synchronized void scheduleJob(int jobNr, long newRate) {//you are free to change/add new scheduling data, but suppose for now you only want to change the rate
if (jobNr == 1) {//instead of if/else you could use a map with all job data
if (job1 != null) {//job was already scheduled, we have to cancel it
job1.cancel(true);
}
//reschedule the same method with a new rate
job1 = taskScheduler.scheduleAtFixedRate(new ScheduledMethodRunnable(myService, "methodInMyServiceToReschedule"), newRate);
}
}
}
一种简单的方法是只添加新任务,而不是尝试取消或重新启动调度程序。
每次配置更改时,只需添加一个具有新配置的新任务。
然后,每当任务 运行s 时,它必须首先检查一些状态(通过查询数据库,或在并发映射中查找,或其他)来决定它是否是最新版本。如果是,那么它应该继续。否则,它应该立即结束。
唯一的缺点是,如果您经常更改作业配置,而不是 运行,那么计划任务列表当然会在内存中不断增长。
使用 Set<ScheduledTask> ScheduledTaskRegistrar.getScheduledTasks()
获取所有计划任务并调用 ScheduledTask::cancel()
怎么样?
或者可能执行 ThreadPoolTaskScheduler::shutdown()
并重新创建 ThreadPoolTaskScheduler 并在 ScheduledTaskRegistrar 中再次设置它?
以下是 this code 的改进版本,似乎是基于 Spring Boot 的工作 POC。您可以根据 table 配置多次启动和停止计划任务。但是您不能从停止的地方开始停止的作业。
1) 在主要 class 中,确保调度已启用,并可能配置一个大小大于 1 的 ThreadPoolTaskScheduler,以便计划的任务可以 运行 并行。
@SpringBootApplication
@EnableScheduling
@Bean
public TaskScheduler poolScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
scheduler.setPoolSize(10);
scheduler.initialize();
return scheduler;
}
2) 包含计划配置的对象,例如在这种情况下,类似 cron 的配置:
public class ScheduleConfigVo {
//some constructors, getter/setters
private String taskName;
private String configValue; // like */10 * * * * * for cron
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScheduleConfigVo that = (ScheduleConfigVo) o;
return taskName.equals(that.taskName) &&
configValue.equals(that.configValue) ;
}
@Override
public int hashCode() {
return Objects.hash(taskName, configValue);
}
}
因为要进行对象比较,所以需要equals和hashCode。
3)我用的是mybatis,所以预定的选择是这样的:
@Mapper
public interface ScheduleConfigMapper {
List<ScheduleConfigVo> getAllConfigure();
}
和
public class ScheduleConfigMapperImpl implements ScheduleConfigMapper {
@Override
public List<ScheduleConfigVo>getAllConfigure() {
return getAllConfigure();
}
}
使用简单的配套 mybatis xml 配置(此处未显示,但可以在互联网上的任何地方找到)。
4) 创建一个 table 并用记录填充它
CREATE TABLE "SCHEDULER"
( "CLASS_NAME" VARCHAR2(100), --PK
"VALUE" VARCHAR2(20 BYTE) --not null
)
并用记录填充它 class_name=Task1, value=*/10 * * * * * 等 => 运行 就像每十秒一个 cron
5) 调度器部分:
@Service
public class DynamicScheduler implements SchedulingConfigurer {
@Autowired
private ScheduleConfigMapper repo;
@Autowired
private Runnable [] tsks;
@Autowired
private TaskScheduler tsch;
private ScheduledTaskRegistrar scheduledTaskRegistrar;
private ScheduledFuture future;
private Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>(); // for the moment it has only class name
List<ScheduleConfigVo> oldList = new ArrayList<>();
List<ScheduleConfigVo> newList;
List<ScheduleConfigVo> addList = new ArrayList<>();
List<ScheduleConfigVo> removeList = new ArrayList<>();
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
if (scheduledTaskRegistrar == null) {
scheduledTaskRegistrar = taskRegistrar;
}
if (taskRegistrar.getScheduler() == null) {
taskRegistrar.setScheduler(tsch);
}
updateJobList();
}
@Scheduled(fixedDelay = 5000)
public void updateJobList() {
newList = repo.getConfigure()== null ? new ArrayList<>() : repo.getConfigure();
addList.clear();
removeList.clear();
if (!newList.isEmpty()) {
//compare new List with oldList
if (!oldList.isEmpty()) {
addList = newList.stream().filter(e -> !oldList.contains(e)).collect(Collectors.toList());
removeList = oldList.stream().filter(e -> !newList.contains(e)).collect(Collectors.toList());
} else {
addList = new ArrayList<>(newList); // nothing to remove
}
} else { // nothing to add
if (!oldList.isEmpty()) {
removeList = new ArrayList<>(oldList);
} // else removeList = 0
}
log.info("addList="+ addList.toString());
log.info("removeList="+ removeList.toString());
//re-schedule here
for ( ScheduleConfigVo conf : removeList ) {
if ( !futureMap.isEmpty()){
future = futureMap.get(conf.getTaskName());
if (future != null) {
log.info("cancelling task "+conf.getTaskName() +" ...");
future.cancel(true);
log.info(conf.getTaskName() + " isCancelled = " + future.isCancelled());
futureMap.remove(conf.getTaskName());
}
}
}
for ( ScheduleConfigVo conf : addList ) {
for (Runnable o: tsks) {
if (o.getClass().getName().contains(conf.getTaskName())) { // o has fqn whereas conf has class name only
log.info("find " + o.getClass().getName() + " to add to scheduler");
future = scheduledTaskRegistrar.getScheduler().schedule(o, (TriggerContext a) -> {
CronTrigger crontrigger = new CronTrigger(conf.getConfigValue());
return crontrigger.nextExecutionTime(a);
});
futureMap.put(o.getClass().getName().substring(o.getClass().getName().lastIndexOf('.')+1), future);
}
}
}
oldList.clear();
oldList= newList;
}
6) 一个或多个实际执行 cron 工作的可运行任务,例如:
@Slf4j
@Service
public class Task1 implements Runnable {
@Override
public void run() {
log.info("Task1 is running...");
}
}
应用程序启动后,cron 作业将 运行。 运行ning 时间间隔随着 table 中值的变化而变化,作业随着 table 条目的删除而停止。
请注意,如果作业 运行 比 cron 间隔长,则下一个 运行 是在上一个作业完成之后。您可以通过在上面的 Task1 中添加例如 sleep 15 秒来模拟这种情况以进行测试。有时在被取消后,一项工作可能仍然 运行 直到完成。
***只是编辑补充一下,如果喜欢lambda的人可以节省一些行,上面的removeList和addList可以修改为:
removeList.stream().filter(conf -> {
future = futureMap.get(conf.getTaskName());
return future != null;
}).forEach((conf) -> {
log.info("cancelling task " + conf.getTaskName() + " ...");
future.cancel(true);
log.info(conf.getTaskName() + " isCancelled = " + future.isCancelled());
});
和
Arrays.stream(tsks).forEach(task -> {
addList.stream().filter(conf -> task.getClass().getName().contains(conf.getTaskName())).forEach(conf -> {
log.info("find " + task.getClass().getName() + " to add to scheduler");
future = scheduledTaskRegistrar.getScheduler().schedule(task, (TriggerContext a) -> {
CronTrigger crontrigger = new CronTrigger(conf.getConfigValue());
return crontrigger.nextExecutionTime(a);
});
futureMap.put(task.getClass().getName().substring(task.getClass().getName().lastIndexOf('.') + 1), future);
});
});
我一直在研究如何使用 Java 8 和 spring 在运行时更改作业的频率。 This question 非常有用,但并没有完全解决我的问题。
我现在可以配置下一次执行作业的日期。但是如果设置延迟为1年,那么我需要等待1年才能考虑新配置。
我的想法是,如果更改了配置值(因此来自另一个 class),则停止计划任务。然后重新计算下一次应该执行任务的时间。也许有更简单的方法。
这是我目前的代码。
@Configuration
@EnableScheduling
public class RequestSchedulerConfig implements SchedulingConfigurer {
@Autowired
SchedulerConfigService schedulerConfigService;
@Bean
public RequestScheduler myBean() {
return new RequestScheduler();
}
@Bean(destroyMethod = "shutdown")
public Executor taskExecutor() {
return Executors.newScheduledThreadPool(100);
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskExecutor());
taskRegistrar.addTriggerTask(
new Runnable() {
@Override public void run() {
myBean().startReplenishmentComputation();
}
},
new Trigger() {
@Override public Date nextExecutionTime(TriggerContext triggerContext) {
Duration d = schedulerConfigService.getIntervalFromDB();
return DateTime.now().plus(d).toDate();
}
}
);
}
}
这就是我想做的。
@RestController
@RequestMapping("/api/config/scheduler")
public class RequestSchedulerController {
@Autowired
ApplicationConfigWrapper applicationConfigWrapper;
@RequestMapping("/set/")
@ResponseBody
public String setRequestSchedulerConfig(@RequestParam(value = "frequency", defaultValue = "") final String frequencyInSeconds){
changeValueInDb(frequencyInSeconds);
myJob.restart();
return "Yeah";
}
}
- 创建一个被注入的单例 bean
TaskScheduler
. This will hold as state variables allScheduledFuture
s,如private ScheduledFuture job1;
- 部署时,从数据库加载所有计划数据并启动作业,填写所有状态变量,如
job1
。 - 调度数据改变时,cancel相应的
Future
(例如job1
),然后用新的调度数据重新启动。
这里的关键思想是在创建 Future
时控制它们,以便将它们保存在一些状态变量中,这样当调度数据发生变化时,您可以取消它们。
这是工作代码:
applicationContext.xml
<task:annotation-driven />
<task:scheduler id="infScheduler" pool-size="10"/>
持有 Future
s
@Component
public class SchedulerServiceImpl implements SchedulerService {
private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
@Autowired
@Qualifier(value="infScheduler")
private TaskScheduler taskScheduler;
@Autowired
private MyService myService;
private ScheduledFuture job1;//for other jobs you can add new private state variables
//Call this on deployment from the ScheduleDataRepository and everytime when schedule data changes.
@Override
public synchronized void scheduleJob(int jobNr, long newRate) {//you are free to change/add new scheduling data, but suppose for now you only want to change the rate
if (jobNr == 1) {//instead of if/else you could use a map with all job data
if (job1 != null) {//job was already scheduled, we have to cancel it
job1.cancel(true);
}
//reschedule the same method with a new rate
job1 = taskScheduler.scheduleAtFixedRate(new ScheduledMethodRunnable(myService, "methodInMyServiceToReschedule"), newRate);
}
}
}
一种简单的方法是只添加新任务,而不是尝试取消或重新启动调度程序。
每次配置更改时,只需添加一个具有新配置的新任务。
然后,每当任务 运行s 时,它必须首先检查一些状态(通过查询数据库,或在并发映射中查找,或其他)来决定它是否是最新版本。如果是,那么它应该继续。否则,它应该立即结束。
唯一的缺点是,如果您经常更改作业配置,而不是 运行,那么计划任务列表当然会在内存中不断增长。
使用 Set<ScheduledTask> ScheduledTaskRegistrar.getScheduledTasks()
获取所有计划任务并调用 ScheduledTask::cancel()
怎么样?
或者可能执行 ThreadPoolTaskScheduler::shutdown()
并重新创建 ThreadPoolTaskScheduler 并在 ScheduledTaskRegistrar 中再次设置它?
以下是 this code 的改进版本,似乎是基于 Spring Boot 的工作 POC。您可以根据 table 配置多次启动和停止计划任务。但是您不能从停止的地方开始停止的作业。
1) 在主要 class 中,确保调度已启用,并可能配置一个大小大于 1 的 ThreadPoolTaskScheduler,以便计划的任务可以 运行 并行。
@SpringBootApplication
@EnableScheduling
@Bean
public TaskScheduler poolScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
scheduler.setPoolSize(10);
scheduler.initialize();
return scheduler;
}
2) 包含计划配置的对象,例如在这种情况下,类似 cron 的配置:
public class ScheduleConfigVo {
//some constructors, getter/setters
private String taskName;
private String configValue; // like */10 * * * * * for cron
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScheduleConfigVo that = (ScheduleConfigVo) o;
return taskName.equals(that.taskName) &&
configValue.equals(that.configValue) ;
}
@Override
public int hashCode() {
return Objects.hash(taskName, configValue);
}
}
因为要进行对象比较,所以需要equals和hashCode。
3)我用的是mybatis,所以预定的选择是这样的:
@Mapper
public interface ScheduleConfigMapper {
List<ScheduleConfigVo> getAllConfigure();
}
和
public class ScheduleConfigMapperImpl implements ScheduleConfigMapper {
@Override
public List<ScheduleConfigVo>getAllConfigure() {
return getAllConfigure();
}
}
使用简单的配套 mybatis xml 配置(此处未显示,但可以在互联网上的任何地方找到)。
4) 创建一个 table 并用记录填充它
CREATE TABLE "SCHEDULER"
( "CLASS_NAME" VARCHAR2(100), --PK
"VALUE" VARCHAR2(20 BYTE) --not null
)
并用记录填充它 class_name=Task1, value=*/10 * * * * * 等 => 运行 就像每十秒一个 cron
5) 调度器部分:
@Service
public class DynamicScheduler implements SchedulingConfigurer {
@Autowired
private ScheduleConfigMapper repo;
@Autowired
private Runnable [] tsks;
@Autowired
private TaskScheduler tsch;
private ScheduledTaskRegistrar scheduledTaskRegistrar;
private ScheduledFuture future;
private Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>(); // for the moment it has only class name
List<ScheduleConfigVo> oldList = new ArrayList<>();
List<ScheduleConfigVo> newList;
List<ScheduleConfigVo> addList = new ArrayList<>();
List<ScheduleConfigVo> removeList = new ArrayList<>();
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
if (scheduledTaskRegistrar == null) {
scheduledTaskRegistrar = taskRegistrar;
}
if (taskRegistrar.getScheduler() == null) {
taskRegistrar.setScheduler(tsch);
}
updateJobList();
}
@Scheduled(fixedDelay = 5000)
public void updateJobList() {
newList = repo.getConfigure()== null ? new ArrayList<>() : repo.getConfigure();
addList.clear();
removeList.clear();
if (!newList.isEmpty()) {
//compare new List with oldList
if (!oldList.isEmpty()) {
addList = newList.stream().filter(e -> !oldList.contains(e)).collect(Collectors.toList());
removeList = oldList.stream().filter(e -> !newList.contains(e)).collect(Collectors.toList());
} else {
addList = new ArrayList<>(newList); // nothing to remove
}
} else { // nothing to add
if (!oldList.isEmpty()) {
removeList = new ArrayList<>(oldList);
} // else removeList = 0
}
log.info("addList="+ addList.toString());
log.info("removeList="+ removeList.toString());
//re-schedule here
for ( ScheduleConfigVo conf : removeList ) {
if ( !futureMap.isEmpty()){
future = futureMap.get(conf.getTaskName());
if (future != null) {
log.info("cancelling task "+conf.getTaskName() +" ...");
future.cancel(true);
log.info(conf.getTaskName() + " isCancelled = " + future.isCancelled());
futureMap.remove(conf.getTaskName());
}
}
}
for ( ScheduleConfigVo conf : addList ) {
for (Runnable o: tsks) {
if (o.getClass().getName().contains(conf.getTaskName())) { // o has fqn whereas conf has class name only
log.info("find " + o.getClass().getName() + " to add to scheduler");
future = scheduledTaskRegistrar.getScheduler().schedule(o, (TriggerContext a) -> {
CronTrigger crontrigger = new CronTrigger(conf.getConfigValue());
return crontrigger.nextExecutionTime(a);
});
futureMap.put(o.getClass().getName().substring(o.getClass().getName().lastIndexOf('.')+1), future);
}
}
}
oldList.clear();
oldList= newList;
}
6) 一个或多个实际执行 cron 工作的可运行任务,例如:
@Slf4j
@Service
public class Task1 implements Runnable {
@Override
public void run() {
log.info("Task1 is running...");
}
}
应用程序启动后,cron 作业将 运行。 运行ning 时间间隔随着 table 中值的变化而变化,作业随着 table 条目的删除而停止。
请注意,如果作业 运行 比 cron 间隔长,则下一个 运行 是在上一个作业完成之后。您可以通过在上面的 Task1 中添加例如 sleep 15 秒来模拟这种情况以进行测试。有时在被取消后,一项工作可能仍然 运行 直到完成。
***只是编辑补充一下,如果喜欢lambda的人可以节省一些行,上面的removeList和addList可以修改为:
removeList.stream().filter(conf -> {
future = futureMap.get(conf.getTaskName());
return future != null;
}).forEach((conf) -> {
log.info("cancelling task " + conf.getTaskName() + " ...");
future.cancel(true);
log.info(conf.getTaskName() + " isCancelled = " + future.isCancelled());
});
和
Arrays.stream(tsks).forEach(task -> {
addList.stream().filter(conf -> task.getClass().getName().contains(conf.getTaskName())).forEach(conf -> {
log.info("find " + task.getClass().getName() + " to add to scheduler");
future = scheduledTaskRegistrar.getScheduler().schedule(task, (TriggerContext a) -> {
CronTrigger crontrigger = new CronTrigger(conf.getConfigValue());
return crontrigger.nextExecutionTime(a);
});
futureMap.put(task.getClass().getName().substring(task.getClass().getName().lastIndexOf('.') + 1), future);
});
});