如何在 MySQL 的 FastAPI 或 ARQ 中正确使用 SQLalchemy?

How to correctly use SQLalchemy within FastAPI or ARQ for MySQL?

我在我的 FastAPI 项目中使用 SQLalchemy 连接到 MySQL 服务器的数据库,该项目由实际的 API 和使用 arq 编写的后端工作者组成。两者共享相同的代码库、模型,因此也共享相同的数据库代码。

我基本上是这样创建数据库连接的:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import config as c

SQLALCHEMY_DATABASE_URL = "%s://%s:%s@%s:%d/%s" % (c.settings.db_type, c.settings.db_username, c.settings.db_password, c.settings.db_host, c.settings.db_port, c.settings.db_database)

engine = create_engine(SQLALCHEMY_DATABASE_URL)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

大部分时间都有效。然而,请求有时会失败并显示错误消息

sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))")

然而,这只发生在工作部分,还没有在 FastAPI 容器中看到它,即使两者都使用相同的代码。在 arq 任务中,每当我需要数据库连接时,我实际上是在做:

from core.database import get_db

# ...

db = get_db().__next__()

有什么办法可以避免这个问题吗?谢谢!

在我的例子中,我忘记在某些情况下关闭数据库连接,因此没有 return 连接到默认池。日志实际上告诉了我这一点。我不确定为什么会导致上面的 BrokenPipe 错误,但修复两个受影响的工作方法解决了这个问题。