我们可以强制 运行 APScheduler 作业存储中的作业吗?

Can we force run the jobs in APScheduler job store?

使用 APScheduler 版本 3.0.3。我的应用程序中的服务在内部使用 APScheduler 来安排 & 运行 作业。此外,我确实围绕实际的 APScheduler 创建了一个包装器 class(只是一个外观,有助于单元测试)。为了对这些服务进行单元测试,我可以模拟这个包装器 class。但是我有一种情况,我真的希望 APScheduler 运行 工作(在测试期间)。有什么办法可以强制 运行 这份工作?

没有立即启动作业的默认触发器,实现这个 您可以获得当前时间并将 DateTrigger 设置为作业,如下所示:

my_job.modify_job(trigger=DateTrigger(run_date=datetime.datetime.now()))

这样你会 "force" 作业到 运行,但是你必须确保在调度程序中再次插入作业,另一种选择是创建一个新作业到 [=20] =]与add_job函数

相同的函数
sched.add_job(func=your_function(),
              trigger=DateTrigger(run_date=datetime.datetime.now()))

这样您就不必执行任何额外的步骤。

另一种方法:您可以在单独的函数中编写作业逻辑。因此,您将能够在预定的工作以及其他地方调用此函数。我想这是一种更明确的方式来做你想做的事。

两种选择。

  1. 换工作next_run_time(未经个人验证,但这里说它有效:

    工作=self.scheduler.get_job(job_id=job_id) job.modify(next_run_time=datetime.now() + timedelta(seconds=5))

  2. 添加一个运行-once job来触发它(已验证):

    self.scheduler.add_job(函数=job.run, 执行者=执行者, 触发=“日期”, max_instances=1, run_date=datetime.now() + timedelta(秒数=5))