如何在 APScheduler 中动态修改作业?
How to modify a job on the fly in APScheduler?
我有一份定期工作。由于需要对其进行升级,而且我不想停止主要代码,我希望我可以 modify/update 即时完成这项工作。我将通过简单的例子来演示。
主要脚本:
from apscheduler.schedulers.blocking import BlockingScheduler
from my_job_func import my_job
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=3)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
要在文件 my_job_func.py 中即时修改的作业:
def my_job():
print('Make some changes!!')
那么,是否可以在不停止主脚本的情况下即时修改或更新 my_job()?
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from my_job_func import my_job
def observer(job):
# you can either reschedule the job
job.reschedule(...)
# or modify it
changes = get_update() # get update from configuration
job.modify(**changes)
if __name__ == '__main__':
scheduler = BlockingScheduler()
my_job_id = scheduler.add_job(my_job, 'interval', seconds=3)
scheduler.add_job(observer, IntervalTrigger(minutes=5), args=(my_job_id,), name="observer")
scheduler.start()
您可以添加一个单独的观察员作业帮助即时修改 my_job
。
请注意,更改可能不会立即生效。您可以调整观察者作业的间隔以满足您的需要。
我有一份定期工作。由于需要对其进行升级,而且我不想停止主要代码,我希望我可以 modify/update 即时完成这项工作。我将通过简单的例子来演示。
主要脚本:
from apscheduler.schedulers.blocking import BlockingScheduler
from my_job_func import my_job
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=3)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
要在文件 my_job_func.py 中即时修改的作业:
def my_job():
print('Make some changes!!')
那么,是否可以在不停止主脚本的情况下即时修改或更新 my_job()?
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from my_job_func import my_job
def observer(job):
# you can either reschedule the job
job.reschedule(...)
# or modify it
changes = get_update() # get update from configuration
job.modify(**changes)
if __name__ == '__main__':
scheduler = BlockingScheduler()
my_job_id = scheduler.add_job(my_job, 'interval', seconds=3)
scheduler.add_job(observer, IntervalTrigger(minutes=5), args=(my_job_id,), name="observer")
scheduler.start()
您可以添加一个单独的观察员作业帮助即时修改 my_job
。
请注意,更改可能不会立即生效。您可以调整观察者作业的间隔以满足您的需要。