如何在 spring 中使用 EnableScheduling 注释在运行时重新启动计划任务?

How to restart scheduled task on runtime with EnableScheduling annotation in spring?

我一直在研究如何使用 Java 8 和 spring 在运行时更改作业的频率。 This question 非常有用,但并没有完全解决我的问题。

我现在可以配置下一次执行作业的日期。但是如果设置延迟为1年,那么我需要等待1年才能考虑新配置。

我的想法是,如果更改了配置值(因此来自另一个 class),则停止计划任务。然后重新计算下一次应该执行任务的时间。也许有更简单的方法。

这是我目前的代码。

@Configuration
@EnableScheduling
public class RequestSchedulerConfig implements SchedulingConfigurer {

    @Autowired
    SchedulerConfigService schedulerConfigService;

    @Bean
    public RequestScheduler myBean() {
        return new RequestScheduler();
    }

    @Bean(destroyMethod = "shutdown")
    public Executor taskExecutor() {
        return Executors.newScheduledThreadPool(100);
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskExecutor());
        taskRegistrar.addTriggerTask(
                new Runnable() {
                    @Override public void run() {
                        myBean().startReplenishmentComputation();
                    }
                },
                new Trigger() {
                    @Override public Date nextExecutionTime(TriggerContext triggerContext) {
                        Duration d = schedulerConfigService.getIntervalFromDB();
                        return DateTime.now().plus(d).toDate();
                    }
                }
        );
    }
}

这就是我想做的。

@RestController
@RequestMapping("/api/config/scheduler")
public class RequestSchedulerController {

    @Autowired
    ApplicationConfigWrapper applicationConfigWrapper;

    @RequestMapping("/set/")
    @ResponseBody
    public String setRequestSchedulerConfig(@RequestParam(value = "frequency", defaultValue = "") final String frequencyInSeconds){
        changeValueInDb(frequencyInSeconds);
        myJob.restart();
        return "Yeah";
    }

}
  1. 创建一个被注入的单例 bean TaskScheduler. This will hold as state variables all ScheduledFutures,如 private ScheduledFuture job1;
  2. 部署时,从数据库加载所有计划数据并启动作业,填写所有状态变量,如 job1
  3. 调度数据改变时,cancel相应的Future(例如job1),然后用新的调度数据重新启动。

这里的关键思想是在创建 Future 时控制它们,以便将它们保存在一些状态变量中,这样当调度数据发生变化时,您可以取消它们。

这是工作代码:

applicationContext.xml

<task:annotation-driven />
<task:scheduler id="infScheduler" pool-size="10"/>

持有 Futures

的单例 bean
@Component
public class SchedulerServiceImpl implements SchedulerService {

        private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);

        @Autowired
        @Qualifier(value="infScheduler")
        private TaskScheduler taskScheduler;

        @Autowired
        private MyService myService;

        private ScheduledFuture job1;//for other jobs you can add new private state variables

        //Call this on deployment from the ScheduleDataRepository and everytime when schedule data changes.
        @Override
        public synchronized void scheduleJob(int jobNr, long newRate) {//you are free to change/add new scheduling data, but suppose for now you only want to change the rate
                if (jobNr == 1) {//instead of if/else you could use a map with all job data
                        if (job1 != null) {//job was already scheduled, we have to cancel it
                                job1.cancel(true);
                        }
                        //reschedule the same method with a new rate
                        job1 = taskScheduler.scheduleAtFixedRate(new ScheduledMethodRunnable(myService, "methodInMyServiceToReschedule"), newRate);
                }
        }
}

一种简单的方法是只添加新任务,而不是尝试取消或重新启动调度程序。

每次配置更改时,只需添加一个具有新配置的新任务。

然后,每当任务 运行s 时,它必须首先检查一些状态(通过查询数据库,或在并发映射中查找,或其他)来决定它是否是最新版本。如果是,那么它应该继续。否则,它应该立即结束。

唯一的缺点是,如果您经常更改作业配置,而不是 运行,那么计划任务列表当然会在内存中不断增长。

使用 Set<ScheduledTask> ScheduledTaskRegistrar.getScheduledTasks() 获取所有计划任务并调用 ScheduledTask::cancel() 怎么样? 或者可能执行 ThreadPoolTaskScheduler::shutdown() 并重新创建 ThreadPoolTask​​Scheduler 并在 ScheduledTaskRegistrar 中再次设置它?

以下是 this code 的改进版本,似乎是基于 Spring Boot 的工作 POC。您可以根据 table 配置多次启动和停止计划任务。但是您不能从停止的地方开始停止的作业。

1) 在主要 class 中,确保调度已启用,并可能配置一个大小大于 1 的 ThreadPoolTask​​Scheduler,以便计划的任务可以 运行 并行。

@SpringBootApplication
@EnableScheduling

 @Bean
public TaskScheduler poolScheduler() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setThreadNamePrefix("ThreadPoolTaskScheduler");
    scheduler.setPoolSize(10);
    scheduler.initialize();
    return scheduler;
}

2) 包含计划配置的对象,例如在这种情况下,类似 cron 的配置:

public class ScheduleConfigVo {
//some constructors, getter/setters
    private String  taskName;
    private String  configValue; // like */10 * * * * * for cron

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ScheduleConfigVo that = (ScheduleConfigVo) o;
        return taskName.equals(that.taskName) &&
            configValue.equals(that.configValue) ;
    }

    @Override
    public int hashCode() {
        return Objects.hash(taskName, configValue);
    }
}

因为要进行对象比较,所以需要equals和hashCode。

3)我用的是mybatis,所以预定的选择是这样的:

@Mapper
public interface ScheduleConfigMapper {
    List<ScheduleConfigVo> getAllConfigure();
}

public class ScheduleConfigMapperImpl implements ScheduleConfigMapper {
    @Override
    public List<ScheduleConfigVo>getAllConfigure() {
        return getAllConfigure();
    }
}

使用简单的配套 mybatis xml 配置(此处未显示,但可以在互联网上的任何地方找到)。

4) 创建一个 table 并用记录填充它

CREATE TABLE "SCHEDULER" 
( "CLASS_NAME" VARCHAR2(100), --PK
"VALUE" VARCHAR2(20 BYTE) --not null
)

并用记录填充它 class_name=Task1, value=*/10 * * * * * 等 => 运行 就像每十秒一个 cron

5) 调度器部分:

@Service
public class DynamicScheduler implements SchedulingConfigurer {

@Autowired
private ScheduleConfigMapper repo;

@Autowired
private Runnable [] tsks;

@Autowired
private TaskScheduler tsch;

private ScheduledTaskRegistrar scheduledTaskRegistrar;
private ScheduledFuture future;

private Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>(); // for the moment it has only class name
List<ScheduleConfigVo> oldList = new ArrayList<>();
List<ScheduleConfigVo> newList;
List<ScheduleConfigVo> addList = new ArrayList<>();
List<ScheduleConfigVo> removeList = new ArrayList<>();

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
    if (scheduledTaskRegistrar == null) {
        scheduledTaskRegistrar = taskRegistrar;
    }
    if (taskRegistrar.getScheduler() == null) {
        taskRegistrar.setScheduler(tsch);
    }
    updateJobList();

}

@Scheduled(fixedDelay = 5000)
public void updateJobList() {
    newList = repo.getConfigure()== null ? new ArrayList<>() : repo.getConfigure();
    addList.clear();
    removeList.clear();

    if (!newList.isEmpty()) {
        //compare new List with oldList
        if (!oldList.isEmpty()) {
            addList = newList.stream().filter(e -> !oldList.contains(e)).collect(Collectors.toList());
            removeList = oldList.stream().filter(e -> !newList.contains(e)).collect(Collectors.toList());
        } else {
            addList = new ArrayList<>(newList); // nothing to remove
        }
    } else { // nothing to add
        if (!oldList.isEmpty()) {
            removeList = new ArrayList<>(oldList);
        } // else removeList = 0
    }
    log.info("addList="+ addList.toString());
    log.info("removeList="+ removeList.toString());
    //re-schedule here

    for ( ScheduleConfigVo conf : removeList ) {
        if ( !futureMap.isEmpty()){
            future = futureMap.get(conf.getTaskName());
            if (future != null) {
                log.info("cancelling task "+conf.getTaskName() +" ...");
                future.cancel(true);
                log.info(conf.getTaskName() + " isCancelled = " + future.isCancelled());
                futureMap.remove(conf.getTaskName());
            }
        }
    }
    for ( ScheduleConfigVo conf : addList ) {
        for (Runnable o: tsks) {
            if (o.getClass().getName().contains(conf.getTaskName())) { // o has fqn whereas conf has class name only
                log.info("find " + o.getClass().getName() + " to add to scheduler");
                future = scheduledTaskRegistrar.getScheduler().schedule(o, (TriggerContext a) -> { 
                    CronTrigger crontrigger = new CronTrigger(conf.getConfigValue());
                    return crontrigger.nextExecutionTime(a);
                });
                futureMap.put(o.getClass().getName().substring(o.getClass().getName().lastIndexOf('.')+1), future);
            }
        }
    }

    oldList.clear();
    oldList= newList;
}

6) 一个或多个实际执行 cron 工作的可运行任务,例如:

@Slf4j
@Service
public class Task1 implements Runnable {
    @Override
    public void run() {
        log.info("Task1 is running...");
    }
}

应用程序启动后,cron 作业将 运行。 运行ning 时间间隔随着 table 中值的变化而变化,作业随着 table 条目的删除而停止。

请注意,如果作业 运行 比 cron 间隔长,则下一个 运行 是在上一个作业完成之后。您可以通过在上面的 Task1 中添加例如 sleep 15 秒来模拟这种情况以进行测试。有时在被取消后,一项工作可能仍然 运行 直到完成。

***只是编辑补充一下,如果喜欢lambda的人可以节省一些行,上面的removeList和addList可以修改为:

removeList.stream().filter(conf -> {
        future = futureMap.get(conf.getTaskName());
        return future != null;
    }).forEach((conf) -> {
        log.info("cancelling task " + conf.getTaskName() + " ...");
        future.cancel(true);
        log.info(conf.getTaskName() + " isCancelled = " + future.isCancelled());
    });

Arrays.stream(tsks).forEach(task -> {
        addList.stream().filter(conf -> task.getClass().getName().contains(conf.getTaskName())).forEach(conf -> {
            log.info("find " + task.getClass().getName() + " to add to scheduler");
            future = scheduledTaskRegistrar.getScheduler().schedule(task, (TriggerContext a) -> {
                CronTrigger crontrigger = new CronTrigger(conf.getConfigValue());
                return crontrigger.nextExecutionTime(a);
            });
            futureMap.put(task.getClass().getName().substring(task.getClass().getName().lastIndexOf('.') + 1), future);
        });
    });