APScheduler 作业未按计划启动
APScheduler job is not starting as scheduled
我正在尝试安排每分钟开始一次作业。
我在 scheduler.py
脚本中定义了调度程序:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 5
}
scheduler = BackgroundScheduler(executors=executors,job_defaults=job_defaults)
我在模块的 __init__.py
中初始化调度程序,如下所示:
from scheduler import scheduler
scheduler.start()
我想在特定操作上启动计划作业,如下所示:
def AddJob():
dbid = repository.database.GetDbid()
job_id = 'CollectData_{0}'.format(dbid)
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)
def TestScheduler():
for i in range(0,29):
starttime = time()
print "test"
sleep(1.0 - ((time() - starttime) % 1.0))
首先:当我在 python 控制台中执行 AddJob()
函数时,它会按预期开始 运行 但不在后台,控制台被阻塞,直到 TestScheduler
函数在 30 秒后结束。我期待它在后台 运行 因为它是一个后台调度程序。
第二:即使指定重复间隔为 1 分钟,作业也不会再次启动。
我错过了什么?
更新
感谢另一个线程,我发现了这个问题。错误的行是这样的:
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)
我改成了:
scheduler.add_job(func=TestScheduler,
trigger='interval',
minutes=1,
id=job_id
)
TestScheduler() 变为 TestScheduler。使用 TestScheduler() 会导致函数 TestScheduler() 的结果作为 add_job() 的参数传递。
第一个问题似乎是您正在 __init__.py
中初始化调度程序,这似乎不是推荐的方法。
__init__.py
中存在的代码在首次导入特定文件夹中的模块时执行。例如,想象一下这个结构:
my_module
|--__init__.py
|--test.py
与 __init__.py
:
from scheduler import scheduler
scheduler.start()
scheduler.start()
命令在 from my_module import something
时执行。所以它要么根本不从 __init__.py
开始,要么它开始很多次(取决于你的代码的其余部分!)。
另一个问题肯定是使用scheduler.scheduled_job()
方法。如果您阅读文档 on adding jobs,您会发现推荐的方法是使用 add_job()
方法,而不是 scheduled_job()
,后者是为了方便起见的装饰器。
我建议这样:
- 保持
my_scheduler.py
不变。
- 从
__init__.py
中删除 scheduler.start()
行。
按如下方式更改您的主文件:
from my_scheduler import scheduler
if not scheduler.running: # Clause suggested by @CyrilleMODIANO
scheduler.start()
def AddJob():
dbid = repository.database.GetDbid()
job_id = 'CollectData_{0}'.format(dbid)
scheduler.add_job(
func=TestScheduler,
trigger='interval',
minutes=1,
id=job_id
)
...
我正在尝试安排每分钟开始一次作业。
我在 scheduler.py
脚本中定义了调度程序:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 5
}
scheduler = BackgroundScheduler(executors=executors,job_defaults=job_defaults)
我在模块的 __init__.py
中初始化调度程序,如下所示:
from scheduler import scheduler
scheduler.start()
我想在特定操作上启动计划作业,如下所示:
def AddJob():
dbid = repository.database.GetDbid()
job_id = 'CollectData_{0}'.format(dbid)
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)
def TestScheduler():
for i in range(0,29):
starttime = time()
print "test"
sleep(1.0 - ((time() - starttime) % 1.0))
首先:当我在 python 控制台中执行 AddJob()
函数时,它会按预期开始 运行 但不在后台,控制台被阻塞,直到 TestScheduler
函数在 30 秒后结束。我期待它在后台 运行 因为它是一个后台调度程序。
第二:即使指定重复间隔为 1 分钟,作业也不会再次启动。
我错过了什么?
更新
感谢另一个线程,我发现了这个问题。错误的行是这样的:
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)
我改成了:
scheduler.add_job(func=TestScheduler,
trigger='interval',
minutes=1,
id=job_id
)
TestScheduler() 变为 TestScheduler。使用 TestScheduler() 会导致函数 TestScheduler() 的结果作为 add_job() 的参数传递。
第一个问题似乎是您正在 __init__.py
中初始化调度程序,这似乎不是推荐的方法。
__init__.py
中存在的代码在首次导入特定文件夹中的模块时执行。例如,想象一下这个结构:
my_module
|--__init__.py
|--test.py
与 __init__.py
:
from scheduler import scheduler
scheduler.start()
scheduler.start()
命令在 from my_module import something
时执行。所以它要么根本不从 __init__.py
开始,要么它开始很多次(取决于你的代码的其余部分!)。
另一个问题肯定是使用scheduler.scheduled_job()
方法。如果您阅读文档 on adding jobs,您会发现推荐的方法是使用 add_job()
方法,而不是 scheduled_job()
,后者是为了方便起见的装饰器。
我建议这样:
- 保持
my_scheduler.py
不变。 - 从
__init__.py
中删除scheduler.start()
行。 按如下方式更改您的主文件:
from my_scheduler import scheduler if not scheduler.running: # Clause suggested by @CyrilleMODIANO scheduler.start() def AddJob(): dbid = repository.database.GetDbid() job_id = 'CollectData_{0}'.format(dbid) scheduler.add_job( func=TestScheduler, trigger='interval', minutes=1, id=job_id ) ...