SQLAlchemy:在不同数据库之间加入并在模块中使用不同文件
SQLAlchemy: JOIN between different databases AND using different files in a module
堆栈
我正在使用:
- Python3.10.x
- FastAPI 0.75.x
- SQLAlchemy 1.4.3x
总结
我正在为多个遗留数据库构建一个统一的 FastAPI 项目(存储在 MariaDB 10.3 上的后端 - 某些遗留软件必须保留结构)。
我的 SQLA 安装程序使用数据库模块来执行以下操作:
/databases.py
import dotenv
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import .models as models
dotenv.load_dotenv()
engines = {
'parts': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300),
'shop': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/shop", pool_pre_ping=True, pool_recycle=300),
'purchasing': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/purchasing", pool_pre_ping=True, pool_recycle=300),
"company": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/company", pool_pre_ping=True, pool_recycle=300),
"auth": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/auth", pool_pre_ping=True, pool_recycle=300),
}
DBSession = sessionmaker(autocommit=False, autoflush=False, binds={
# Catalogue
models.Shop.Catalogue: engines["shop"],
models.Shop.Sections: engines["shop"],
models.Shop.Orders: engines["shop"],
# ...
# Parts
models.Parts.Part: engines["parts"],
models.Parts.BinLocations: engines["parts"],
# ...
#Purchasing
models.Purchasing.SupplierOrder: engines["purchasing"],
models.Purchasing.SupplierOrder: engines["purchasing"],
# Company Data
models.Company.Staffmember: engines["company"],
models.Company.Suppliers: engines["company"],
# API Auth
models.Auth.User: engines["auth"],
models.Auth.Privileges: engines["auth"],
})
# Dependency
def getDb():
db = DBSession()
try:
yield db
finally:
db.close()
对每个模型都这样做有点费力,但它确实有效。
因为我有几个数据库,所以我认为为每个数据库创建一个带有子文件的 models
模块是合乎逻辑的,例如models.Parts
、models.Shop
、models.Purchase
、models.Company
、models.Auth
等
/models/init.py
from importlib.metadata import metadata
from sqlalchemy.orm import declarative_base
base = declarative_base()
from . import Auth, Parts, Shop, Catalogue, Purchasing, Shop
我可以通过在 models
的 __init__.py
中导入 Base
对象并将其导入每个子文件来成功创建关系。例如:
/models/Auth.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class User(Base):
__tablename__ = 'users'
id = Column(Integer, nullable=False, primary_key=True)
username = Column(String(256), nullable=False)
passhash = Column(String(512), nullable=False)
email = Column(String, nullable=False)
enabled = Column(Integer, nullable=True)
staffmember_id = Column(Integer, nullable=False)
staffmember = relationship("Company.Staffmember", uselist=False)
/models/Company.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class Staffmebmer(Base):
__tablename__ = 'staffmembers'
id = Column(Integer, ForeignKey("users.staffmember_id"), nullable=False, primary_key=True)
order = Column(Integer, default=0, nullable=False)
name = Column(String, nullable=True)
initial = Column(String, nullable=True)
email = Column(String, nullable=False)
enabled = Column(Integer, default=0, nullable=False)
relationship("Auth.User", back_populates="staffmember")
以下路线工作得很好:
demo.py
from fastapi import Depends
from sqlalchemy.orm import Session
from .. import app, databases, models
@app.get("/api/user/{id}")
async def read_items(id: int, db: Session=Depends(databases.getDb)):
user = db.query(models.Auth.User).filter(
models.Auth.User.id == id
).first()
user.staffmember
return user
访问此 URL returns:
(是的,我知道这不安全,它仅用于说明目的,以表明关系有效!)
{
"username": "mark",
"passhash": "<my hash>",
"enabled": 1,
"email": "mark@demo.com",
"id": 1,
"staffmember_id": 5,
"staffmember": {
"order": 20,
"name": "Mark",
"email": "mark@demo.com",
"kStaffmember": 5,
"initial": "MB",
"enabled": 1
}
}
但是,我想使用 steffmember 首字母作为可能的用户名,所以当我在我的 OAUTH 授权脚本中查询用户时,我尝试使用:
from ..models import Auth, Company
# 'username' is provided by the auth script from the standard username/password OAuth fields
def get_user(db: Session, username: str):
db_user_data = db.query(Auth.User).join(Company.Staffmember).filter(
or_(
Auth.User.username == username,
Auth.User.email == username,
Company.Staffmember.initial == username
)
).first()
我得到一个异常:
(pymysql.err.ProgrammingError) (1146, "Table 'auth.staffmembers' doesn't exist")
我是否以正确的方式处理这整件事?是否有解决此问题的可能方法?
如果有人遇到这个问题并需要一个合理的答案,我将其提交到 SQLAlchemy Git 讨论页面,并得到了一个非常理智的回复,帮助我解决了这个问题。
https://github.com/sqlalchemy/sqlalchemy/discussions/8027
总结:
- 对于不同的数据库,您不需要多个引擎连接到相同的MySQL/mariadb服务器。您只需要在其中一个数据库上启动会话 - 请记住它是 数据库 的名称,而不是 Python SQA 代码中的模型或模块。
我的新 databases.py:
import dotenv
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import .models as models
dotenv.load_dotenv()
DBengine = create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300)
DBSession = sessionmaker(bind=DBengine autocommit=False, autoflush=False)
# Dependency
def getDb():
db = DBSession()
try:
yield db
finally:
db.close()
如果您使用上述样式,则需要更明确地表达您的 ForeignKey()
语句,例如ForeignKey("<db>.<table>.<field>")
所以你明确地告诉 SQA 哪个数据库和 table 要查找每个数据库。
您将需要将数据库名称作为每个模型的名称作为模式,例如添加 __table_args__ = { "schema": "<database name>" }
- 请记住它是 数据库的名称 而不是 Python SQA 代码中的模型或模块。
新建/modules/Auth.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class User(Base):
__tablename__ = 'users' #table is called 'users'
__table_args__ = { "schema": "auth" } #database is called 'auth'
id = Column(Integer, nullable=False, primary_key=True)
username = Column(String(256), nullable=False)
passhash = Column(String(512), nullable=False)
email = Column(String, nullable=False)
enabled = Column(Integer, nullable=True)
staffmember_id = Column(Integer, nullable=False)
staffmember = relationship("Company.Staffmember", uselist=False)
新建/models/Company.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class Staffmember(Base):
__tablename__ = 'staffmembers' #table is called 'staffmembers'
__table_args__ = { "schema": "company" } #database is called 'company'
id = Column(Integer, ForeignKey("auth.users.staffmember_id"), nullable=False, primary_key=True)
# ForeignKey now needs to know the database AND table name for the field it refers to
order = Column(Integer, default=0, nullable=False)
name = Column(String, nullable=True)
initial = Column(String, nullable=True)
email = Column(String, nullable=False)
enabled = Column(Integer, default=0, nullable=False)
relationship("Auth.User", back_populates="staffmember")
一旦您使用了这个过程,SQA 就会知道在连接和关系中添加正确的数据库名称前缀,一切都会正常进行。
堆栈
我正在使用:
- Python3.10.x
- FastAPI 0.75.x
- SQLAlchemy 1.4.3x
总结
我正在为多个遗留数据库构建一个统一的 FastAPI 项目(存储在 MariaDB 10.3 上的后端 - 某些遗留软件必须保留结构)。
我的 SQLA 安装程序使用数据库模块来执行以下操作:
/databases.py
import dotenv
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import .models as models
dotenv.load_dotenv()
engines = {
'parts': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300),
'shop': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/shop", pool_pre_ping=True, pool_recycle=300),
'purchasing': create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/purchasing", pool_pre_ping=True, pool_recycle=300),
"company": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/company", pool_pre_ping=True, pool_recycle=300),
"auth": create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/auth", pool_pre_ping=True, pool_recycle=300),
}
DBSession = sessionmaker(autocommit=False, autoflush=False, binds={
# Catalogue
models.Shop.Catalogue: engines["shop"],
models.Shop.Sections: engines["shop"],
models.Shop.Orders: engines["shop"],
# ...
# Parts
models.Parts.Part: engines["parts"],
models.Parts.BinLocations: engines["parts"],
# ...
#Purchasing
models.Purchasing.SupplierOrder: engines["purchasing"],
models.Purchasing.SupplierOrder: engines["purchasing"],
# Company Data
models.Company.Staffmember: engines["company"],
models.Company.Suppliers: engines["company"],
# API Auth
models.Auth.User: engines["auth"],
models.Auth.Privileges: engines["auth"],
})
# Dependency
def getDb():
db = DBSession()
try:
yield db
finally:
db.close()
对每个模型都这样做有点费力,但它确实有效。
因为我有几个数据库,所以我认为为每个数据库创建一个带有子文件的 models
模块是合乎逻辑的,例如models.Parts
、models.Shop
、models.Purchase
、models.Company
、models.Auth
等
/models/init.py
from importlib.metadata import metadata
from sqlalchemy.orm import declarative_base
base = declarative_base()
from . import Auth, Parts, Shop, Catalogue, Purchasing, Shop
我可以通过在 models
的 __init__.py
中导入 Base
对象并将其导入每个子文件来成功创建关系。例如:
/models/Auth.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class User(Base):
__tablename__ = 'users'
id = Column(Integer, nullable=False, primary_key=True)
username = Column(String(256), nullable=False)
passhash = Column(String(512), nullable=False)
email = Column(String, nullable=False)
enabled = Column(Integer, nullable=True)
staffmember_id = Column(Integer, nullable=False)
staffmember = relationship("Company.Staffmember", uselist=False)
/models/Company.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class Staffmebmer(Base):
__tablename__ = 'staffmembers'
id = Column(Integer, ForeignKey("users.staffmember_id"), nullable=False, primary_key=True)
order = Column(Integer, default=0, nullable=False)
name = Column(String, nullable=True)
initial = Column(String, nullable=True)
email = Column(String, nullable=False)
enabled = Column(Integer, default=0, nullable=False)
relationship("Auth.User", back_populates="staffmember")
以下路线工作得很好:
demo.py
from fastapi import Depends
from sqlalchemy.orm import Session
from .. import app, databases, models
@app.get("/api/user/{id}")
async def read_items(id: int, db: Session=Depends(databases.getDb)):
user = db.query(models.Auth.User).filter(
models.Auth.User.id == id
).first()
user.staffmember
return user
访问此 URL returns: (是的,我知道这不安全,它仅用于说明目的,以表明关系有效!)
{
"username": "mark",
"passhash": "<my hash>",
"enabled": 1,
"email": "mark@demo.com",
"id": 1,
"staffmember_id": 5,
"staffmember": {
"order": 20,
"name": "Mark",
"email": "mark@demo.com",
"kStaffmember": 5,
"initial": "MB",
"enabled": 1
}
}
但是,我想使用 steffmember 首字母作为可能的用户名,所以当我在我的 OAUTH 授权脚本中查询用户时,我尝试使用:
from ..models import Auth, Company
# 'username' is provided by the auth script from the standard username/password OAuth fields
def get_user(db: Session, username: str):
db_user_data = db.query(Auth.User).join(Company.Staffmember).filter(
or_(
Auth.User.username == username,
Auth.User.email == username,
Company.Staffmember.initial == username
)
).first()
我得到一个异常:
(pymysql.err.ProgrammingError) (1146, "Table 'auth.staffmembers' doesn't exist")
我是否以正确的方式处理这整件事?是否有解决此问题的可能方法?
如果有人遇到这个问题并需要一个合理的答案,我将其提交到 SQLAlchemy Git 讨论页面,并得到了一个非常理智的回复,帮助我解决了这个问题。
https://github.com/sqlalchemy/sqlalchemy/discussions/8027
总结:
- 对于不同的数据库,您不需要多个引擎连接到相同的MySQL/mariadb服务器。您只需要在其中一个数据库上启动会话 - 请记住它是 数据库 的名称,而不是 Python SQA 代码中的模型或模块。
我的新 databases.py:
import dotenv
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import .models as models
dotenv.load_dotenv()
DBengine = create_engine("mysql+pymysql://" + os.environ['DB_URL'] + "/parts", pool_pre_ping=True, pool_recycle=300)
DBSession = sessionmaker(bind=DBengine autocommit=False, autoflush=False)
# Dependency
def getDb():
db = DBSession()
try:
yield db
finally:
db.close()
如果您使用上述样式,则需要更明确地表达您的
ForeignKey()
语句,例如ForeignKey("<db>.<table>.<field>")
所以你明确地告诉 SQA 哪个数据库和 table 要查找每个数据库。您将需要将数据库名称作为每个模型的名称作为模式,例如添加
__table_args__ = { "schema": "<database name>" }
- 请记住它是 数据库的名称 而不是 Python SQA 代码中的模型或模块。
新建/modules/Auth.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class User(Base):
__tablename__ = 'users' #table is called 'users'
__table_args__ = { "schema": "auth" } #database is called 'auth'
id = Column(Integer, nullable=False, primary_key=True)
username = Column(String(256), nullable=False)
passhash = Column(String(512), nullable=False)
email = Column(String, nullable=False)
enabled = Column(Integer, nullable=True)
staffmember_id = Column(Integer, nullable=False)
staffmember = relationship("Company.Staffmember", uselist=False)
新建/models/Company.py
from . import base as Base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, Numeric, Date, DateTime, ForeignKey, null, or_, and_
class Staffmember(Base):
__tablename__ = 'staffmembers' #table is called 'staffmembers'
__table_args__ = { "schema": "company" } #database is called 'company'
id = Column(Integer, ForeignKey("auth.users.staffmember_id"), nullable=False, primary_key=True)
# ForeignKey now needs to know the database AND table name for the field it refers to
order = Column(Integer, default=0, nullable=False)
name = Column(String, nullable=True)
initial = Column(String, nullable=True)
email = Column(String, nullable=False)
enabled = Column(Integer, default=0, nullable=False)
relationship("Auth.User", back_populates="staffmember")
一旦您使用了这个过程,SQA 就会知道在连接和关系中添加正确的数据库名称前缀,一切都会正常进行。