如何管理 Tornado 应用程序中资源的异步启动和关闭?

How do I manage asynchronous startup and shutdown of resources in a Tornado application?

我想在 Quart 或 FastAPI 中使用 aioredis in a Tornado application. However, I couldn't figure out a way to implement an async startup and shutdown of its resources since the Application class has no ASGI Lifespan 事件。 换句话说,我需要在应用程序开始为请求提供服务之前创建一个 Redis 池,并在应用程序完成或即将结束后立即释放该池。问题是 aioredis 池创建是异步的,但 Tornado 应用程序创建是同步的。

基本应用程序如下所示:

    import os

from aioredis import create_redis_pool
from aioredis.commands import Redis
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application

from .handlers import hello

redis: Redis = None


async def start_resources() -> None:
    '''
    Initialize resources such as Redis and Database connections
    '''
    global redis
    REDIS_HOST = os.environ['REDIS_HOST']
    REDIS_PORT = os.environ['REDIS_PORT']
    redis = await create_redis_pool((REDIS_HOST, REDIS_PORT), encoding='utf-8')


async def close_resources() -> None:
    '''
    Release resources
    '''
    redis.close()
    await redis.wait_closed()


def create_app() -> Application:
    app = Application([
        ("/hello", hello.HelloHandler),
    ])

    return app


if __name__ == '__main__':

    app = create_app()
    http_server = HTTPServer(app)
    http_server.listen(8000)
    IOLoop.current().start()

重要的是我也可以在测试期间使用启动和关闭功能。

有什么想法吗?

要创建池,请在开始循环之前run_sync 调用协程:

if __name__ == '__main__':
    IOLoop.current().run_sync(start_resources)
    ...

要在程序退出前销毁池,请使用 try...finally 块,这样由于未处理的异常而导致的突然退出也被考虑在内:

if __name__ == '__main__': 
    # create db pool
    IOLoop.current().run_sync(start_resources)

    ...

    try:
        # start the loop
        IOLoop.current().start()
    except:
        pass
    finally:
        # this will close the pool before exiting
        IOLoop.current().run_sync(close_resources)

是正确的,让我走上了正确的轨道。我只是认为它可以改进一点,所以我发布了这个替代方案:

from contextlib import contextmanager

# ... previous code omitted for brevity


@contextmanager
def create_app() -> Application:
    IOLoop.current().run_sync(start_resources)
    try:
        app = Application([
            ("/hello", hello.HelloHandler),
        ])
        yield app
    finally:
        IOLoop.current().run_sync(close_resources)


if __name__ == '__main__':
    with create_app() as app:
        http_server = HTTPServer(app)
        http_server.listen(8000)
        IOLoop.current().start()

此外,要在 pytestpytest-tornado 的测试中使用此代码,您应该创建一个 conftest.py 文件,如下所示:

from typing import Iterator

from pytest import fixture
from tornado.platform.asyncio import AsyncIOLoop
from tornado.web import Application

from app.main import create_app


@fixture
def app(io_loop: AsyncIOLoop) -> Iterator[Application]:
    '''
    Return a Tornado.web.Application object with initialized resources
    '''
    with create_app() as app:
        yield app

请注意,将 io_loop 声明为依赖项注入很重要。