使用 FastAPI 在 Uvicorn 中处理信号

Signal handling in Uvicorn with FastAPI

我有一个使用 UvicornFastAPI 的应用程序。我还打开了一些连接(例如 MongoDB)。一旦出现某些信号(SIGINTSIGTERMSIGKILL),我想优雅地关闭这些连接。

我的 server.py 文件:

import uvicorn
import fastapi
import signal
import asyncio

from source.gql import gql


app = fastapi.FastAPI()

app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
app.mount("/graphql", gql)

# handle signals
HANDLED_SIGNALS = (
    signal.SIGINT,
    signal.SIGTERM
)

loop = asyncio.get_event_loop()
for sig in HANDLED_SIGNALS:
    loop.add_signal_handler(sig, _some_callback_func)

if __name__ == "__main__":
    uvicorn.run(app, port=6900)

不幸的是,我尝试实现此目的的方法不起作用。当我在终端中尝试 Ctrl+C 时,没有任何反应。我相信这是因为 Uvicorn 是在不同的线程中启动的...

正确的做法是什么?我注意到了 uvicorn.Server.install_signal_handlers() 函数,但没有幸运地使用它...

FastAPI 允许定义需要在应用程序启动之前或应用程序关闭时执行的事件处理程序(函数)。因此,您可以使用 shutdown 事件,如 here:

所述
@app.on_event("shutdown")
def shutdown_event():
    # close connections here