SQLAlchemy:在不同数据库之间加入并在模块中使用不同文件

SQLAlchemy: JOIN between different databases AND using different files in a module

堆栈

我正在使用:

总结

我正在为多个遗留数据库构建一个统一的 FastAPI 项目(存储在 MariaDB 10.3 上的后端 - 某些遗留软件必须保留结构)。

我的 SQLA 安装程序使用数据库模块来执行以下操作:

/databases.py

import dotenv
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import .models as models

dotenv.load_dotenv()

engines = {
    'parts': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300),
    'shop': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/shop", pool_pre_ping=True, pool_recycle=300),
    'purchasing': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/purchasing", pool_pre_ping=True, pool_recycle=300),
    "company": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/company", pool_pre_ping=True, pool_recycle=300),
    "auth": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/auth", pool_pre_ping=True, pool_recycle=300),
}

DBSession = sessionmaker(autocommit=False, autoflush=False, binds={
    # Catalogue
    models.Shop.Catalogue: engines["shop"],
    models.Shop.Sections: engines["shop"],
    models.Shop.Orders: engines["shop"],
    # ...
    # Parts
    models.Parts.Part: engines["parts"],
    models.Parts.BinLocations: engines["parts"],

    # ...
    #Purchasing
    models.Purchasing.SupplierOrder: engines["purchasing"],
    models.Purchasing.SupplierOrder: engines["purchasing"],
    # Company Data
    models.Company.Staffmember: engines["company"],
    models.Company.Suppliers: engines["company"],
    # API Auth
    models.Auth.User: engines["auth"],
    models.Auth.Privileges: engines["auth"],
})

# Dependency
def getDb():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()

对每个模型都这样做有点费力,但它确实有效。

因为我有几个数据库,所以我认为为每个数据库创建一个带有子文件的 models 模块是合乎逻辑的,例如models.Partsmodels.Shopmodels.Purchasemodels.Companymodels.Auth

/models/init.py


from importlib.metadata import metadata
from sqlalchemy.orm import declarative_base

base = declarative_base()

from . import Auth, Parts, Shop, Catalogue, Purchasing, Shop

我可以通过在 models__init__.py 中导入 Base 对象并将其导入每个子文件来成功创建关系。例如:

/models/Auth.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, nullable=False, primary_key=True)
    username = Column(String(256), nullable=False)
    passhash = Column(String(512), nullable=False)
    email = Column(String, nullable=False)
    enabled = Column(Integer, nullable=True)
    staffmember_id = Column(Integer, nullable=False)

    staffmember = relationship("Company.Staffmember", uselist=False)

/models/Company.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class Staffmebmer(Base):
    __tablename__ = 'staffmembers'

    id = Column(Integer, ForeignKey("users.staffmember_id"), nullable=False, primary_key=True)
    order = Column(Integer, default=0, nullable=False)
    name = Column(String, nullable=True)
    initial = Column(String, nullable=True)
    email = Column(String, nullable=False)
    enabled = Column(Integer, default=0, nullable=False)

    relationship("Auth.User", back_populates="staffmember")

以下路线工作得很好:

demo.py


from fastapi import Depends

from sqlalchemy.orm import Session

from .. import app, databases, models

@app.get("/api/user/{id}")
async def read_items(id: int, db: Session=Depends(databases.getDb)):
    user = db.query(models.Auth.User).filter(
        models.Auth.User.id == id
    ).first()

    user.staffmember

    return user

访问此 URL returns: (是的,我知道这不安全,它仅用于说明目的,以表明关系有效!)

{
  "username": "mark",
  "passhash": "<my hash>",
  "enabled": 1,
  "email": "mark@demo.com",
  "id": 1,
  "staffmember_id": 5,
  "staffmember": {
    "order": 20,
    "name": "Mark",
    "email": "mark@demo.com",
    "kStaffmember": 5,
    "initial": "MB",
    "enabled": 1
  }
}

但是,我想使用 steffmember 首字母作为可能的用户名,所以当我在我的 OAUTH 授权脚本中查询用户时,我尝试使用:


from ..models import Auth, Company

# 'username' is provided by the auth script from the standard username/password OAuth fields

def get_user(db: Session, username: str):
    db_user_data = db.query(Auth.User).join(Company.Staffmember).filter(
        or_(
            Auth.User.username == username,
            Auth.User.email == username,
            Company.Staffmember.initial == username
        )
    ).first()

我得到一个异常:

(pymysql.err.ProgrammingError) (1146, "Table 'auth.staffmembers' doesn't exist")

我是否以正确的方式处理这整件事?是否有解决此问题的可能方法?

如果有人遇到这个问题并需要一个合理的答案,我将其提交到 SQLAlchemy Git 讨论页面,并得到了一个非常理智的回复,帮助我解决了这个问题。

https://github.com/sqlalchemy/sqlalchemy/discussions/8027

总结:

  • 对于不同的数据库,您不需要多个引擎连接到相同的MySQL/mariadb服务器。您只需要在其中一个数据库上启动会话 - 请记住它是 数据库 的名称,而不是 Python SQA 代码中的模型或模块。

我的新 databases.py:

import dotenv
import os

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import .models as models

dotenv.load_dotenv()

DBengine = create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300)

DBSession = sessionmaker(bind=DBengine autocommit=False, autoflush=False)

# Dependency
def getDb():
    db = DBSession()
    try:
        yield db
    finally:
        db.close()
  • 如果您使用上述样式,则需要更明确地表达您的 ForeignKey() 语句,例如ForeignKey("<db>.<table>.<field>") 所以你明确地告诉 SQA 哪个数据库和 table 要查找每个数据库。

  • 您将需要将数据库名称作为每个模型的名称作为模式,例如添加 __table_args__ = { "schema": "<database name>" } - 请记住它是 数据库的名称 而不是 Python SQA 代码中的模型或模块。

新建/modules/Auth.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class User(Base):
    __tablename__ = 'users' #table is called 'users'
    __table_args__ = { "schema": "auth" } #database is called 'auth'

    id = Column(Integer, nullable=False, primary_key=True)
    username = Column(String(256), nullable=False)
    passhash = Column(String(512), nullable=False)
    email = Column(String, nullable=False)
    enabled = Column(Integer, nullable=True)
    staffmember_id = Column(Integer, nullable=False)

    staffmember = relationship("Company.Staffmember", uselist=False)

新建/models/Company.py

from . import base as Base

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_

class Staffmember(Base):
    __tablename__ = 'staffmembers' #table is called 'staffmembers'
    __table_args__ = { "schema": "company" } #database is called 'company'

    id = Column(Integer, ForeignKey("auth.users.staffmember_id"), nullable=False, primary_key=True)
    # ForeignKey now needs to know the database AND table name for the field it refers to
    order = Column(Integer, default=0, nullable=False)
    name = Column(String, nullable=True)
    initial = Column(String, nullable=True)
    email = Column(String, nullable=False)
    enabled = Column(Integer, default=0, nullable=False)

    relationship("Auth.User", back_populates="staffmember")

一旦您使用了这个过程,SQA 就会知道在连接和关系中添加正确的数据库名称前缀,一切都会正常进行。