APScheduler 如何立即触发作业
APScheduler how to trigger job now
我在 Flask 应用程序中有一个 APScheduler,以一定的时间间隔发送事件。
现在我需要 "refresh" 所有作业,实际上只是 现在 如果它们不 运行 不触及定义的时间间隔就开始它们。
我试过调用 job.pause() 然后 job.resume() 什么也没用,并且使用了作业。 reschedule_job(...) 会触发它,但也会更改间隔...我不想要。
我的实际代码如下:
cron = GeventScheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
cron.add_job(_job_time, 'interval', seconds=5, id='_job_time')
cron.add_job(_job_forecast, 'interval', hours=1, id='_job_forecast_01')
@app.route("/refresh")
def refresh():
refreshed = []
for job in cron.get_jobs():
job.pause()
job.resume()
refreshed.append(job.id)
return json.dumps( {'number': len(cron.get_jobs()), 'list': refreshed} )
作为解决方法,我使用了以下方法。
总而言之,我循环遍历所有作业 cron.get_jobs()
并使用作业对象创建一次性作业到 'date' 触发器,该触发器仅在 datetime.now
触发一次,因为未指定。
def refresh():
refreshed = []
for job in cron.get_jobs():
cron.add_job(job.func, 'date', id='{0}.uniq'.format(job.id), max_instances=1)
refreshed.append(job.id)
return json.dumps( {'number': len(cron.get_jobs()), 'list': refreshed} )
您也可以直接 运行 工作职能。
for job in cron.get_jobs():
job.func()
如果您将 args 或 kwargs 传递给函数,则必须将它们从 job.args
and/or job.kwargs
中提取出来。参见 apscheduler.job.Job
我不鼓励按照接受的答案中的建议调用 job.func()
。调度程序不会意识到作业是 运行 的事实,并且会搞乱常规调度逻辑。
改为使用作业的 modify()
函数将其 next_run_time
属性 设置为 now()
:
for job in scheduler.get_jobs():
job.modify(next_run_time=datetime.now())
很晚才回答,但我认为您需要的是简单地调用“刷新”路由中的方法。
我在 Flask 应用程序中有一个 APScheduler,以一定的时间间隔发送事件。
现在我需要 "refresh" 所有作业,实际上只是 现在 如果它们不 运行 不触及定义的时间间隔就开始它们。
我试过调用 job.pause() 然后 job.resume() 什么也没用,并且使用了作业。 reschedule_job(...) 会触发它,但也会更改间隔...我不想要。
我的实际代码如下:
cron = GeventScheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
cron.add_job(_job_time, 'interval', seconds=5, id='_job_time')
cron.add_job(_job_forecast, 'interval', hours=1, id='_job_forecast_01')
@app.route("/refresh")
def refresh():
refreshed = []
for job in cron.get_jobs():
job.pause()
job.resume()
refreshed.append(job.id)
return json.dumps( {'number': len(cron.get_jobs()), 'list': refreshed} )
作为解决方法,我使用了以下方法。
总而言之,我循环遍历所有作业 cron.get_jobs()
并使用作业对象创建一次性作业到 'date' 触发器,该触发器仅在 datetime.now
触发一次,因为未指定。
def refresh():
refreshed = []
for job in cron.get_jobs():
cron.add_job(job.func, 'date', id='{0}.uniq'.format(job.id), max_instances=1)
refreshed.append(job.id)
return json.dumps( {'number': len(cron.get_jobs()), 'list': refreshed} )
您也可以直接 运行 工作职能。
for job in cron.get_jobs():
job.func()
如果您将 args 或 kwargs 传递给函数,则必须将它们从 job.args
and/or job.kwargs
中提取出来。参见 apscheduler.job.Job
我不鼓励按照接受的答案中的建议调用 job.func()
。调度程序不会意识到作业是 运行 的事实,并且会搞乱常规调度逻辑。
改为使用作业的 modify()
函数将其 next_run_time
属性 设置为 now()
:
for job in scheduler.get_jobs():
job.modify(next_run_time=datetime.now())
很晚才回答,但我认为您需要的是简单地调用“刷新”路由中的方法。