在 FastAPI (ASGI) 中正确启动与 SQLAlchemy Core 的异步连接
Properly starting async connection with SQLAlchemy Core in FastAPI (ASGI)
我正在尝试使用 FastAPI、Postgres 和 SQLAlchemy Core 构建异步后端 API 服务。
当我在没有前端的情况下测试我的 API 服务时,它 运行 完美,但是当前端开始多次同时访问后端时,我开始看到以下错误:
ERROR: Exception in ASGI application
...
sqlalchemy.exc.ResourceClosedError: This Connection is closed
这是我的设置:
- service/utils/db.py
from fastapi import Request
from sqlalchemy.ext.asyncio import create_async_engine
from service.utils.config import DB_DSN
...
def get_async_engine(echo: bool = True, future: bool = Ture):
global ENGINE
if not ENGINE:
ENGINE = create_async_engine(
str(DB_DSN), echo=echo, future=future
)
return ENGINE
async def get_async_conn():
engine = get_async_engine()
async with engine.begine() as conn:
Request.db_conn = conn
yield
- service/api/main.py
from fastapi import Depends, FastAPI
from service.utils.db import get_async_conn
def get_app():
app = FastAPI(
title="app", dependencies=[Depends(get_async_conn)]
)
...
return app
- 示例端点
from fastapi import APIRouter
router = APIRouter(...)
@router.get("")
async def get_items():
query = await Request.db_conn.execute(...) # some SQL query
items = query.fetchall()
关于如何正确管理异步引擎和与 SQLAlchemy Core 的连接有什么建议吗?
我意识到我正在将数据库连接分配给全局位置。当有超过 1 个请求到达 API 服务器时,这会出现问题,导致 get_async_conn()
在第一个连接被释放之前被连续调用,第二个请求替换了已设置的连接在 Request.db_conn
中。这主要是由于我的愚蠢+我试图找到一个我可以访问的地方来使用连接而不需要将连接对象从一个函数传递到另一个函数。尽管如此,我还是使用 encode/databases 解决了这个问题。非常容易集成,但还缺少细节。如果您正在寻找一个简单的数据库 connection/session/engine 管理,请试试这个。
我正在尝试使用 FastAPI、Postgres 和 SQLAlchemy Core 构建异步后端 API 服务。
当我在没有前端的情况下测试我的 API 服务时,它 运行 完美,但是当前端开始多次同时访问后端时,我开始看到以下错误:
ERROR: Exception in ASGI application
...
sqlalchemy.exc.ResourceClosedError: This Connection is closed
这是我的设置:
- service/utils/db.py
from fastapi import Request
from sqlalchemy.ext.asyncio import create_async_engine
from service.utils.config import DB_DSN
...
def get_async_engine(echo: bool = True, future: bool = Ture):
global ENGINE
if not ENGINE:
ENGINE = create_async_engine(
str(DB_DSN), echo=echo, future=future
)
return ENGINE
async def get_async_conn():
engine = get_async_engine()
async with engine.begine() as conn:
Request.db_conn = conn
yield
- service/api/main.py
from fastapi import Depends, FastAPI
from service.utils.db import get_async_conn
def get_app():
app = FastAPI(
title="app", dependencies=[Depends(get_async_conn)]
)
...
return app
- 示例端点
from fastapi import APIRouter
router = APIRouter(...)
@router.get("")
async def get_items():
query = await Request.db_conn.execute(...) # some SQL query
items = query.fetchall()
关于如何正确管理异步引擎和与 SQLAlchemy Core 的连接有什么建议吗?
我意识到我正在将数据库连接分配给全局位置。当有超过 1 个请求到达 API 服务器时,这会出现问题,导致 get_async_conn()
在第一个连接被释放之前被连续调用,第二个请求替换了已设置的连接在 Request.db_conn
中。这主要是由于我的愚蠢+我试图找到一个我可以访问的地方来使用连接而不需要将连接对象从一个函数传递到另一个函数。尽管如此,我还是使用 encode/databases 解决了这个问题。非常容易集成,但还缺少细节。如果您正在寻找一个简单的数据库 connection/session/engine 管理,请试试这个。