APScheduler 作业未按计划启动

APScheduler job is not starting as scheduled

我正在尝试安排每分钟开始一次作业。 我在 scheduler.py 脚本中定义了调度程序:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 5
}

scheduler = BackgroundScheduler(executors=executors,job_defaults=job_defaults)

我在模块的 __init__.py 中初始化调度程序,如下所示:

from scheduler import scheduler

scheduler.start()

我想在特定操作上启动计划作业,如下所示:

def AddJob():
    dbid = repository.database.GetDbid()
    job_id = 'CollectData_{0}'.format(dbid)
    scheduler.scheduled_job(func=TestScheduler(),
                            trigger='interval',
                            minutes=1,
                            id=job_id
                            )

def TestScheduler():
    for i in range(0,29):
        starttime = time()
        print "test"
        sleep(1.0 - ((time() - starttime) % 1.0))

首先:当我在 python 控制台中执行 AddJob() 函数时,它会按预期开始 运行 但不在后台,控制台被阻塞,直到 TestScheduler 函数在 30 秒后结束。我期待它在后台 运行 因为它是一个后台调度程序。
第二:即使指定重复间隔为 1 分钟,作业也不会再次启动。

我错过了什么?

更新

感谢另一个线程,我发现了这个问题。错误的行是这样的:

scheduler.scheduled_job(func=TestScheduler(),
                            trigger='interval',
                            minutes=1,
                            id=job_id
                            )

我改成了:

scheduler.add_job(func=TestScheduler,
                            trigger='interval',
                            minutes=1,
                            id=job_id
                            )

TestScheduler() 变为 TestScheduler。使用 TestScheduler() 会导致函数 TestScheduler() 的结果作为 add_job() 的参数传递。

第一个问题似乎是您正在 __init__.py 中初始化调度程序,这似乎不是推荐的方法。
__init__.py 中存在的代码在首次导入特定文件夹中的模块时执行。例如,想象一下这个结构:

my_module
|--__init__.py
|--test.py

__init__.py:

from scheduler import scheduler

scheduler.start()

scheduler.start() 命令在 from my_module import something 时执行。所以它要么根本不从 __init__.py 开始,要么它开始很多次(取决于你的代码的其余部分!)。

另一个问题肯定是使用scheduler.scheduled_job()方法。如果您阅读文档 on adding jobs,您会发现推荐的方法是使用 add_job() 方法,而不是 scheduled_job(),后者是为了方便起见的装饰器。

我建议这样:

  1. 保持 my_scheduler.py 不变。
  2. __init__.py 中删除 scheduler.start() 行。
  3. 按如下方式更改您的主文件:

    from my_scheduler import scheduler
    
    if not scheduler.running: # Clause suggested by @CyrilleMODIANO
        scheduler.start()
    
    def AddJob():
        dbid = repository.database.GetDbid()
        job_id = 'CollectData_{0}'.format(dbid)
        scheduler.add_job(
            func=TestScheduler,
            trigger='interval',
            minutes=1,
            id=job_id
        )
    
    ...