如何从 apscheduler 访问 return 值?

How to access return value from apscheduler?

我不太清楚如何从 apscheduler 中的计划作业访问 return 值。作业需要在每天不同的时间 运行,我需要今天作业的 return 值来安排明天的作业。

这个 link (how to get return value from apscheduler jobs) 似乎是这个问题的最佳答案。它建议向调度程序添加一个侦听器。我添加了一个侦听器,但我不确定如何访问它的 return 值。我可以访问附加到调度程序的侦听器,但无法访问它们的输出。下面代码中的侦听器 job_runs() 将在计划作业 运行s.

时打印

此外,我知道我需要访问一个 JobExecutionEvent (https://apscheduler.readthedocs.io/en/latest/modules/events.html#module-apscheduler.events),它保存函数中的 return 值。

首先,我要访问的函数是 run_all() ,其中执行了一堆操作,但我只是 return True 对于测试用例。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, JobExecutionEvent
from datetime import datetime, timedelta
import logging


def run_all():
    return True


def job_runs(event):  # listener function
    if event.exception:
        print('The job did not run')
    else:
        print('The job completed @ {}'.format(datetime.now()))


def job_return_val(event):  # listener function
    return event.retval

然后,我设置调度程序、添加侦听器并添加作业。触发器设置为 运行 作业添加到调度程序 1 分钟后的函数。

  scheduler = BackgroundScheduler()
  scheduler.add_listener(job_runs, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  scheduler.add_listener(job_return_val, EVENT_JOB_EXECUTED)
  cron_args = datetime_to_dict(datetime.now() + timedelta(minutes=1))
  job = scheduler.add_job(run_all, "cron", **cron_args)

接下来,我启动计划程序并打印计划的作业。此外,我设置了日志记录,所以我知道调度程序在哪里。

  test = scheduler.start()
  scheduler.print_jobs()
  logging.basicConfig()
  logging.getLogger('apscheduler').setLevel(logging.DEBUG)

启用日志记录后,调度程序报告作业 运行 并从调度程序中删除,如我所料。 job_runs() 将正确的输出打印到控制台。有了断点,我知道 job_return_val() 被调用了。但是,我不知道它 returns 的值被发送到哪里。该函数似乎是在另一个名为 APScheduler 的线程中调用的。我对线程了解不多,但这是有道理的。但是,我不明白该线程的输出何时 returned 到主线程。

最后,我尝试使用代码实例化 JobExceptionEvent,job_id、jobstore 和 scheduled_run_time 可从调度程序和作业的属性访问,但 JobExceptionEvent 似乎没有任何知道该事件在调度程序中是 运行。由于上一段中描述的线程,这似乎也很有意义。

如果能帮助解决这个问题,那就太好了!

listener 的 return 值未在任何地方使用(参见 code),因此 return 任何值都没有用。如果您需要根据前一个作业的值(通过事件对象在侦听器中获取)安排另一个作业,则必须在该侦听器中正确执行。

编辑:为了说明如何做到这一点(并证明这是可能的),请参阅此示例代码:

from datetime import datetime
import time

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


def tack():
    print('Tack! The time is: %s' % datetime.now())


def listener(event):
    if not event.exception:
        job = scheduler.get_job(event.job_id)
        if job.name == 'tick':
            scheduler.add_job(tack)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    scheduler.add_job(tick, 'interval', seconds=5)
    scheduler.start()

    try:
        while True:
            time.sleep(1)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

输出:

(venv) pasmen@nyx:~/tmp/x$ python test.py 
Tick! The time is: 2019-04-03 19:51:29.192420
Tack! The time is: 2019-04-03 19:51:29.195878
Tick! The time is: 2019-04-03 19:51:34.193145
Tack! The time is: 2019-04-03 19:51:34.194898
Tick! The time is: 2019-04-03 19:51:39.193207
Tack! The time is: 2019-04-03 19:51:39.194868
Tick! The time is: 2019-04-03 19:51:44.193223
Tack! The time is: 2019-04-03 19:51:44.195066
...

您需要的是 stateful jobs feature 才能实现。

您现在可以使用全局变量。这是一个例子:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

def fn():
    '''Increase `times` by one and print it.'''
    global times
    times += 1
    print(times)


sched = BlockingScheduler()
times = 0

# Execute fn() each second.
sched.add_job(fn, trigger=CronTrigger(second='*/1'))
sched.start()

我前几天遇到了同样的问题。我的第一个解决方案是使用关键字 global,但不久我就意识到这是有问题的,因为在作业外定义的变量可能会意外更改,尤其是当它们是循环中的局部变量时。

然后我也想到了使用监听器。但是传递给侦听器的回调仅将事件作为单个参数,这意味着您可以从回调中获得的唯一信息是事件本身,这极大地限制了您可以做的事情。

最后我选择将一个函数传递给要安排的任务,这对我来说很好。您真正需要做的只是使用计划任务的 return 值作为 func 的参数,如下面的代码所示。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

scheduler = BlockingScheduler()

def job_with_return(a:int, b:int, callback_return):
    # You can do something heavier here with the inputs.
    result = a + b
    print(f'[in job] result is {result}')
    if callback_return:
        callback_return(result)

scheduler.add_job(func=job_with_return,
    trigger='date',
    args=(1, 2, lambda r: print(f'[out of job]: result is {r}')),
    run_date=datetime.now(),
    )

scheduler.start()

输出:

[in job] result is 3
[out of job]: result is 3

关于OP的要求,

The job needs to run at a different time each day, and I need the return value from today's job to schedule tomorrow's job.

您还可以向 func 提供调度程序以及您希望明天 运行 完成任务的时间,以便您可以在 func 中安排时间表。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

scheduler = BlockingScheduler()
today = datetime.now()
tomorrow = today + timedelta(seconds=1)

def task_today(a:int, b:int, scheduler:BlockingScheduler, callback_return, tomorrow:datetime):
    # What we have to do today is to get the result and use it to schedule tomorrow's task.
    result_today = a + b
    print(f"[{datetime.now().strftime('%H:%M:%S')}] (Today) The result is {result_today}.")
    scheduler.add_job(callback_return, 'date',
                      args=(result_today,),
                      run_date=tomorrow,
                      id='job_tomorrow')

def task_tomorrow(result_from_today:int):
    result_tomrrow = result_from_today * 2
    print(f"[{datetime.now().strftime('%H:%M:%S')}] (Tommorow) The result is {result_tomrrow}.")

scheduler.add_job(func=task_today,
                trigger='date',
                args=(1, 2, scheduler, task_tomorrow, tomorrow),
                run_date=today,
                id='job_today')
    
scheduler.start()

输出:

[22:22:40] (Today) The result is 3.
[22:22:41] (Tommorow) The result is 6.

你甚至可以用递归做一个日常任务。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta

scheduler = BlockingScheduler()
today = datetime.now()

def task_daily(result_yesterday:int, day_counter:int, scheduler:BlockingScheduler, callback_return):
    # You can do something heavier here with more inputs.
    result_today = result_yesterday + 2
    day_counter += 1
    tomorrow = datetime.now() + timedelta(seconds=1)
    print(f"[{datetime.now().strftime('%H:%M:%S')}] (day {day_counter}) The result for today is {result_today}.")
    scheduler.add_job(task_daily, 'date',
                        args=(result_today, day_counter, scheduler, callback_return),
                        run_date=tomorrow)

scheduler.add_job(func=task_daily,
                trigger='date',
                args=(0, 0, scheduler, task_daily),
                run_date=today)

scheduler.start()

输出:

[22:43:17] (day 1) The result for today is 2.
[22:43:18] (day 2) The result for today is 4.
[22:43:19] (day 3) The result for today is 6.
[22:43:20] (day 4) The result for today is 8.
[22:43:21] (day 5) The result for today is 10.
[22:43:22] (day 6) The result for today is 12.
[22:43:23] (day 7) The result for today is 14.