Python APScheduler - AsyncIOScheduler 如何工作?

Python APScheduler - How does AsyncIOScheduler work?

我很难理解 AsyncIOScheduler 是如何工作的,它是如何非阻塞的?

如果我的作业正在执行阻塞函数,AsyncIOScheduler 会阻塞吗?

如果我将 AsyncIOSchedulerThreadPoolExecutor 一起使用会怎样?这是如何运作的?我可以等待作业执行吗?

基于documentation,AsyncIOScheduler 在事件循环中执行。

它是非阻塞的,因为它只会将自己添加到事件循环中并等待您启动它。

一旦事件循环启动,它将运行异步。

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio

async def job():
    print('hi')

scheduler = AsyncIOScheduler()
scheduler.add_job(job, "interval", seconds=3)

scheduler.start()

asyncio.get_event_loop().run_forever()

输出

Run time of job "job (trigger: interval[0:00:03], next run at: 2020-07-27 14:06:39 -03)" was missed by 0:00:02.542515
hi
hi

使用一些 Internet 资源,我发现了一些有用的事实。希望对你有帮助。

一个典型的 APScheduler 实例包含数十个作业,这些作业执行常规 Python 函数。 APScheduler 实例可以调度的作业数量没有限制;它仅取决于机器的实际负载。默认情况下,APScheduler 将所有作业存储在内存中。如果您希望您的作业在进程重启后仍然存在并从上次触发时继续触发,您可以将这些作业存储在数据库中,例如任何 RDBMS、Redis、MongoDB 等

根据您的应用程序 运行,它可以 运行 作为线程、asyncio 任务或其他。初始化时,APScheduler 不会执行任何操作,除非您将 Python 函数添加为作业。添加所有作业后,您需要“启动”调度程序。有关如何使用 APScheduler 的简单示例,这里有一段代码可以正常工作。

from urllib.request import urlopen
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job("interval", seconds=10)
def keep_warm():
    urlopen("https://enqueuezero.com", timeout=10)
    
scheduler.start()

这确保每 10 秒请求一次 URL。该程序 运行s 作为阻塞进程。如果您想将它们与您的应用程序共存,您可以考虑使用 BackgroundSchedulerAsyncIOScheduler

这里是 BackgroundScheduler 的一些代码片段。

from datetime import datetime
import time
import os

from apscheduler.schedulers.background import BackgroundScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # This is here to simulate application activity (which keeps the main thread alive).
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # Not strictly necessary if daemonic mode is enabled but should be done if possible
        scheduler.shutdown()

下面的代码将演示如何使用 asyncio 兼容调度程序来安排以 3 秒为间隔执行的作业。

import asyncio
import os
from datetime import datetime

from apscheduler.schedulers.asyncio import AsyncIOScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

您可以使用此 link!

获取更多有关 apscheduler 的示例

因此,在 APScheduler 中有 3 个重要组件:

  1. 调度器
  2. 执行者
  3. 数据存储

对于这个问题,只有 1 和 2 是相关的。

Scheduler 只是根据间隔设置决定何时调用作业,在 AsyncIOScheduler 的情况下,它使用 asyncio 使等待时间不阻塞。它 运行s 在与 main 相同的进程和线程中。如果您的应用程序已经 运行 正在使用异步循环,这将非常有用,因为它可以节省 运行 新 proccess/thread.

的开销

现在,当一个job需要执行时,调用的是Executor,在AsyncIOScheduler的情况下,默认是使用AsyncIOExecutor,其中运行s与调度程序相同的线程和进程,如果作业函数被签名为异步,否则它使用 asyncio 的 run_in_executor,它 运行 在线程池中。

这让我们想到了最后一个问题,如果我们将 AsyncIOSchedulerThreadPoolExecutor 一起使用会怎样?好吧,从技术上讲,这与使用具有非异步功能的默认 Executor 相同,它将 运行 它放在线程池中,但调度程序将保留在主线程中。