具有多个绑定的 SQLAlchemy - 动态选择绑定到查询
SQLAlchemy with multiple binds - Dynamically choose bind to query
我有 4 个不同的数据库,每个客户(医疗诊所)都有一个,它们都具有完全相同的结构。
在我的应用中,我有Patient
、Doctor
、Appointment
等模型
我们以其中之一为例:
class Patient(db.Model):
__tablename__ = "patients"
id = Column(Integer, primary_key=True)
first_name = Column(String, index=True)
last_name = Column(String, index=True)
date_of_birth = Column(Date, index=True)
我发现在绑定的帮助下我可以创建不同的数据库并将每个模型关联到不同的绑定。所以我有这个配置:
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
app.config['SQLALCHEMY_BINDS'] = {
'clinic1':'mysql://user:pass@localhost/clinic1',
'clinic2':'mysql://user:pass@localhost/clinic2',
'clinic3':'mysql://user:pass@localhost/clinic3',
'clinic4':'mysql://user:pass@localhost/clinic4'
}
现在我正在努力实现两件事:
- 我希望当我使用
db.create_all()
创建 table 时,它会在所有 4 个数据库 (clinic1->clinic4)[=45] 中创建 patients
table =]
- 我希望能够动态地选择一个特定的绑定(在 运行 时间内),这样任何诸如
Patient.query.filter().count()
的查询都将 运行 针对所选的绑定数据库
理想情况下,它会像这样:
with DbContext(bind='client1'):
patients_count = Patient.query.filter().count()
print(patients_count)
# outside of the `with` context we are back to the default bind
但是,这样做:
patients_count = Patient.query.filter().count()
不指定绑定,将引发错误(因为 patients
table 在默认绑定中不存在)
任何可以指导如何完成此操作的代码示例将不胜感激!
P.S。您可能会建议不要使用不同的数据库,而是使用具有不同列/tables 的数据库,但请坚持使用我的示例并尝试解释如何使用多个相同数据库的这种模式来完成此操作
谢谢!
1。在所有绑定中创建表
观察:db.create_all()
调用 self.get_tables_for_bind()
。
解决方案:覆盖 SQLAlchemy
get_tables_for_bind()
以支持 '__all__'
.
class MySQLAlchemy(SQLAlchemy):
def get_tables_for_bind(self, bind=None):
result = []
for table in self.Model.metadata.tables.values():
# if table.info.get('bind_key') == bind:
if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
result.append(table)
return result
用法:
# db = SQLAlchemy(app) # Replace this
db = MySQLAlchemy(app) # with this
db.create_all()
2。动态选择特定绑定
观察:SignallingSession
get_bind()
负责确定绑定。
解决方案:
- 覆盖
SignallingSession
get_bind()
以从某些上下文中获取绑定密钥。
- 覆盖
SQLAlchemy
create_session()
以使用我们的自定义会话 class。
- 支持上下文选择
db
上的特定绑定以实现可访问性。
- 通过覆盖
SQLAlchemy
get_binds()
以恢复默认引擎,强制为使用 '__all__'
作为绑定键的表指定上下文。
class MySignallingSession(SignallingSession):
def __init__(self, db, *args, **kwargs):
super().__init__(db, *args, **kwargs)
self.db = db
def get_bind(self, mapper=None, clause=None):
if mapper is not None:
info = getattr(mapper.persist_selectable, 'info', {})
if info.get('bind_key') == '__all__':
info['bind_key'] = self.db.context_bind_key
try:
return super().get_bind(mapper=mapper, clause=clause)
finally:
info['bind_key'] = '__all__'
return super().get_bind(mapper=mapper, clause=clause)
class MySQLAlchemy(SQLAlchemy):
context_bind_key = None
@contextmanager
def context(self, bind=None):
_context_bind_key = self.context_bind_key
try:
self.context_bind_key = bind
yield
finally:
self.context_bind_key = _context_bind_key
def create_session(self, options):
return orm.sessionmaker(class_=MySignallingSession, db=self, **options)
def get_binds(self, app=None):
binds = super().get_binds(app=app)
# Restore default engine for table.info.get('bind_key') == '__all__'
app = self.get_app(app)
engine = self.get_engine(app, None)
tables = self.get_tables_for_bind('__all__')
binds.update(dict((table, engine) for table in tables))
return binds
def get_tables_for_bind(self, bind=None):
result = []
for table in self.Model.metadata.tables.values():
if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
result.append(table)
return result
用法:
class Patient(db.Model):
__tablename__ = "patients"
__bind_key__ = "__all__" # Add this
测试用例:
with db.context(bind='clinic1'):
db.session.add(Patient())
db.session.flush() # Flush in 'clinic1'
with db.context(bind='clinic2'):
patients_count = Patient.query.filter().count()
print(patients_count) # 0 in 'clinic2'
patients_count = Patient.query.filter().count()
print(patients_count) # 1 in 'clinic1'
关于引用默认绑定的外键
您必须指定 schema
。
限制:
- MySQL:
- 绑定必须在同一个 MySQL 实例中。否则,它必须是普通列。
- 默认绑定中的外部对象必须已经提交。
否则,当插入一个引用它的对象时,你会得到这个锁错误:
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
- SQLite:不强制执行跨数据库的外键。
用法:
# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
class PatientType(db.Model):
__tablename__ = "patient_types"
__table_args__ = {"schema": "main"} # Add this, based on database name
id = Column(Integer, primary_key=True)
# ...
class Patient(db.Model):
__tablename__ = "patients"
__bind_key__ = "__all__"
id = Column(Integer, primary_key=True)
# ...
# patient_type_id = Column(Integer, ForeignKey("patient_types.id")) # Replace this
patient_type_id = Column(Integer, ForeignKey("main.patient_types.id")) # with this
patient_type = relationship("PatientType")
测试用例:
patient_type = PatientType.query.first()
if not patient_type:
patient_type = PatientType()
db.session.add(patient_type)
db.session.commit() # Commit to reference from other binds
with db.context(bind='clinic1'):
db.session.add(Patient(patient_type=patient_type))
db.session.flush() # Flush in 'clinic1'
with db.context(bind='clinic2'):
patients_count = Patient.query.filter().count()
print(patients_count) # 0 in 'clinic2'
patients_count = Patient.query.filter().count()
print(patients_count) # 1 in 'clinic1'
我有 4 个不同的数据库,每个客户(医疗诊所)都有一个,它们都具有完全相同的结构。
在我的应用中,我有Patient
、Doctor
、Appointment
等模型
我们以其中之一为例:
class Patient(db.Model):
__tablename__ = "patients"
id = Column(Integer, primary_key=True)
first_name = Column(String, index=True)
last_name = Column(String, index=True)
date_of_birth = Column(Date, index=True)
我发现在绑定的帮助下我可以创建不同的数据库并将每个模型关联到不同的绑定。所以我有这个配置:
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
app.config['SQLALCHEMY_BINDS'] = {
'clinic1':'mysql://user:pass@localhost/clinic1',
'clinic2':'mysql://user:pass@localhost/clinic2',
'clinic3':'mysql://user:pass@localhost/clinic3',
'clinic4':'mysql://user:pass@localhost/clinic4'
}
现在我正在努力实现两件事:
- 我希望当我使用
db.create_all()
创建 table 时,它会在所有 4 个数据库 (clinic1->clinic4)[=45] 中创建patients
table =] - 我希望能够动态地选择一个特定的绑定(在 运行 时间内),这样任何诸如
Patient.query.filter().count()
的查询都将 运行 针对所选的绑定数据库
理想情况下,它会像这样:
with DbContext(bind='client1'):
patients_count = Patient.query.filter().count()
print(patients_count)
# outside of the `with` context we are back to the default bind
但是,这样做:
patients_count = Patient.query.filter().count()
不指定绑定,将引发错误(因为 patients
table 在默认绑定中不存在)
任何可以指导如何完成此操作的代码示例将不胜感激!
P.S。您可能会建议不要使用不同的数据库,而是使用具有不同列/tables 的数据库,但请坚持使用我的示例并尝试解释如何使用多个相同数据库的这种模式来完成此操作
谢谢!
1。在所有绑定中创建表
观察:db.create_all()
调用 self.get_tables_for_bind()
。
解决方案:覆盖 SQLAlchemy
get_tables_for_bind()
以支持 '__all__'
.
class MySQLAlchemy(SQLAlchemy):
def get_tables_for_bind(self, bind=None):
result = []
for table in self.Model.metadata.tables.values():
# if table.info.get('bind_key') == bind:
if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
result.append(table)
return result
用法:
# db = SQLAlchemy(app) # Replace this
db = MySQLAlchemy(app) # with this
db.create_all()
2。动态选择特定绑定
观察:SignallingSession
get_bind()
负责确定绑定。
解决方案:
- 覆盖
SignallingSession
get_bind()
以从某些上下文中获取绑定密钥。 - 覆盖
SQLAlchemy
create_session()
以使用我们的自定义会话 class。 - 支持上下文选择
db
上的特定绑定以实现可访问性。 - 通过覆盖
SQLAlchemy
get_binds()
以恢复默认引擎,强制为使用'__all__'
作为绑定键的表指定上下文。
class MySignallingSession(SignallingSession):
def __init__(self, db, *args, **kwargs):
super().__init__(db, *args, **kwargs)
self.db = db
def get_bind(self, mapper=None, clause=None):
if mapper is not None:
info = getattr(mapper.persist_selectable, 'info', {})
if info.get('bind_key') == '__all__':
info['bind_key'] = self.db.context_bind_key
try:
return super().get_bind(mapper=mapper, clause=clause)
finally:
info['bind_key'] = '__all__'
return super().get_bind(mapper=mapper, clause=clause)
class MySQLAlchemy(SQLAlchemy):
context_bind_key = None
@contextmanager
def context(self, bind=None):
_context_bind_key = self.context_bind_key
try:
self.context_bind_key = bind
yield
finally:
self.context_bind_key = _context_bind_key
def create_session(self, options):
return orm.sessionmaker(class_=MySignallingSession, db=self, **options)
def get_binds(self, app=None):
binds = super().get_binds(app=app)
# Restore default engine for table.info.get('bind_key') == '__all__'
app = self.get_app(app)
engine = self.get_engine(app, None)
tables = self.get_tables_for_bind('__all__')
binds.update(dict((table, engine) for table in tables))
return binds
def get_tables_for_bind(self, bind=None):
result = []
for table in self.Model.metadata.tables.values():
if table.info.get('bind_key') == bind or (bind is not None and table.info.get('bind_key') == '__all__'):
result.append(table)
return result
用法:
class Patient(db.Model):
__tablename__ = "patients"
__bind_key__ = "__all__" # Add this
测试用例:
with db.context(bind='clinic1'):
db.session.add(Patient())
db.session.flush() # Flush in 'clinic1'
with db.context(bind='clinic2'):
patients_count = Patient.query.filter().count()
print(patients_count) # 0 in 'clinic2'
patients_count = Patient.query.filter().count()
print(patients_count) # 1 in 'clinic1'
关于引用默认绑定的外键
您必须指定 schema
。
限制:
- MySQL:
- 绑定必须在同一个 MySQL 实例中。否则,它必须是普通列。
- 默认绑定中的外部对象必须已经提交。
否则,当插入一个引用它的对象时,你会得到这个锁错误:MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
- SQLite:不强制执行跨数据库的外键。
用法:
# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:pass@localhost/main'
class PatientType(db.Model):
__tablename__ = "patient_types"
__table_args__ = {"schema": "main"} # Add this, based on database name
id = Column(Integer, primary_key=True)
# ...
class Patient(db.Model):
__tablename__ = "patients"
__bind_key__ = "__all__"
id = Column(Integer, primary_key=True)
# ...
# patient_type_id = Column(Integer, ForeignKey("patient_types.id")) # Replace this
patient_type_id = Column(Integer, ForeignKey("main.patient_types.id")) # with this
patient_type = relationship("PatientType")
测试用例:
patient_type = PatientType.query.first()
if not patient_type:
patient_type = PatientType()
db.session.add(patient_type)
db.session.commit() # Commit to reference from other binds
with db.context(bind='clinic1'):
db.session.add(Patient(patient_type=patient_type))
db.session.flush() # Flush in 'clinic1'
with db.context(bind='clinic2'):
patients_count = Patient.query.filter().count()
print(patients_count) # 0 in 'clinic2'
patients_count = Patient.query.filter().count()
print(patients_count) # 1 in 'clinic1'