如何管理 Tornado 应用程序中资源的异步启动和关闭?
How do I manage asynchronous startup and shutdown of resources in a Tornado application?
我想在 Quart 或 FastAPI 中使用 aioredis in a Tornado application. However, I couldn't figure out a way to implement an async startup and shutdown of its resources since the Application class has no ASGI Lifespan 事件。
换句话说,我需要在应用程序开始为请求提供服务之前创建一个 Redis 池,并在应用程序完成或即将结束后立即释放该池。问题是 aioredis 池创建是异步的,但 Tornado 应用程序创建是同步的。
基本应用程序如下所示:
import os
from aioredis import create_redis_pool
from aioredis.commands import Redis
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application
from .handlers import hello
redis: Redis = None
async def start_resources() -> None:
'''
Initialize resources such as Redis and Database connections
'''
global redis
REDIS_HOST = os.environ['REDIS_HOST']
REDIS_PORT = os.environ['REDIS_PORT']
redis = await create_redis_pool((REDIS_HOST, REDIS_PORT), encoding='utf-8')
async def close_resources() -> None:
'''
Release resources
'''
redis.close()
await redis.wait_closed()
def create_app() -> Application:
app = Application([
("/hello", hello.HelloHandler),
])
return app
if __name__ == '__main__':
app = create_app()
http_server = HTTPServer(app)
http_server.listen(8000)
IOLoop.current().start()
重要的是我也可以在测试期间使用启动和关闭功能。
有什么想法吗?
要创建池,请在开始循环之前run_sync
调用协程:
if __name__ == '__main__':
IOLoop.current().run_sync(start_resources)
...
要在程序退出前销毁池,请使用 try...finally
块,这样由于未处理的异常而导致的突然退出也被考虑在内:
if __name__ == '__main__':
# create db pool
IOLoop.current().run_sync(start_resources)
...
try:
# start the loop
IOLoop.current().start()
except:
pass
finally:
# this will close the pool before exiting
IOLoop.current().run_sync(close_resources)
是正确的,让我走上了正确的轨道。我只是认为它可以改进一点,所以我发布了这个替代方案:
from contextlib import contextmanager
# ... previous code omitted for brevity
@contextmanager
def create_app() -> Application:
IOLoop.current().run_sync(start_resources)
try:
app = Application([
("/hello", hello.HelloHandler),
])
yield app
finally:
IOLoop.current().run_sync(close_resources)
if __name__ == '__main__':
with create_app() as app:
http_server = HTTPServer(app)
http_server.listen(8000)
IOLoop.current().start()
此外,要在 pytest
和 pytest-tornado
的测试中使用此代码,您应该创建一个 conftest.py
文件,如下所示:
from typing import Iterator
from pytest import fixture
from tornado.platform.asyncio import AsyncIOLoop
from tornado.web import Application
from app.main import create_app
@fixture
def app(io_loop: AsyncIOLoop) -> Iterator[Application]:
'''
Return a Tornado.web.Application object with initialized resources
'''
with create_app() as app:
yield app
请注意,将 io_loop
声明为依赖项注入很重要。
我想在 Quart 或 FastAPI 中使用 aioredis in a Tornado application. However, I couldn't figure out a way to implement an async startup and shutdown of its resources since the Application class has no ASGI Lifespan 事件。 换句话说,我需要在应用程序开始为请求提供服务之前创建一个 Redis 池,并在应用程序完成或即将结束后立即释放该池。问题是 aioredis 池创建是异步的,但 Tornado 应用程序创建是同步的。
基本应用程序如下所示:
import os
from aioredis import create_redis_pool
from aioredis.commands import Redis
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application
from .handlers import hello
redis: Redis = None
async def start_resources() -> None:
'''
Initialize resources such as Redis and Database connections
'''
global redis
REDIS_HOST = os.environ['REDIS_HOST']
REDIS_PORT = os.environ['REDIS_PORT']
redis = await create_redis_pool((REDIS_HOST, REDIS_PORT), encoding='utf-8')
async def close_resources() -> None:
'''
Release resources
'''
redis.close()
await redis.wait_closed()
def create_app() -> Application:
app = Application([
("/hello", hello.HelloHandler),
])
return app
if __name__ == '__main__':
app = create_app()
http_server = HTTPServer(app)
http_server.listen(8000)
IOLoop.current().start()
重要的是我也可以在测试期间使用启动和关闭功能。
有什么想法吗?
要创建池,请在开始循环之前run_sync
调用协程:
if __name__ == '__main__':
IOLoop.current().run_sync(start_resources)
...
要在程序退出前销毁池,请使用 try...finally
块,这样由于未处理的异常而导致的突然退出也被考虑在内:
if __name__ == '__main__':
# create db pool
IOLoop.current().run_sync(start_resources)
...
try:
# start the loop
IOLoop.current().start()
except:
pass
finally:
# this will close the pool before exiting
IOLoop.current().run_sync(close_resources)
from contextlib import contextmanager
# ... previous code omitted for brevity
@contextmanager
def create_app() -> Application:
IOLoop.current().run_sync(start_resources)
try:
app = Application([
("/hello", hello.HelloHandler),
])
yield app
finally:
IOLoop.current().run_sync(close_resources)
if __name__ == '__main__':
with create_app() as app:
http_server = HTTPServer(app)
http_server.listen(8000)
IOLoop.current().start()
此外,要在 pytest
和 pytest-tornado
的测试中使用此代码,您应该创建一个 conftest.py
文件,如下所示:
from typing import Iterator
from pytest import fixture
from tornado.platform.asyncio import AsyncIOLoop
from tornado.web import Application
from app.main import create_app
@fixture
def app(io_loop: AsyncIOLoop) -> Iterator[Application]:
'''
Return a Tornado.web.Application object with initialized resources
'''
with create_app() as app:
yield app
请注意,将 io_loop
声明为依赖项注入很重要。