使用 sqlalchemy 将一个对象从一个数据库保存到另一个数据库
Persist one object from one database to another using sqlalchemy
我有两个具有完全相同 table 的数据库(均为 Mysql),我想使用 Sqlalchemy 将一些数据从一个数据库复制到另一个数据库。
我可以按照这个问题给出的答案复制简单的对象:
Cannot move object from one database to another
问题是当对象具有来自另一个 table 的依赖项时,我也想复制这些依赖项。
所以为了更清楚,这是我的模型(两个数据库相同,但使用指向不同数据库的不同 bind_key):
db1 = SQLAlchemy()
Class Payment(db.Model):
__tablename__ = 'payments'
__bind_key__ = 'db1'
id = db1.Column(db.Integer, primary_key=True)
paymethod_id = db1.Column(db.Integer(), db1.ForeignKey(PaymentMethod.id))
payment_method = db1.relationship(PaymentMethod)
我想做的是:
from models1 import Payment as Payment1
from models2 import Payment as Payment2
# query from one database
payment1 = db1.session.query(Payment1).first()
# create and add it to the other database
payment2 = Payment2(**payment1.__dict__.copy())
db2.session.add(payment)
db2.session.commit()
但在这种情况下外键失败,因为我还没有存储 PaymentMethod。
是否有不同的方法来做到这一点,或者我必须为我的对象的每个依赖项执行此过程并确保我事先存储子项?
感谢任何帮助:)
我想出了一个解决方案,将对象重新映射到正确的模型并存储其所有子对象。您调用方法 save_obj
并传递要映射的对象。然后它将检索具有相同名称的 table 但随后从您要将对象重新映射到的模型中检索,并且它将递归地对其所有子对象执行相同的操作。您必须在方法 get_model
.
中定义正确的模型
至运行 禁用自动刷新以防止在对象正确形成之前提交是必要的,并且在调用该方法之后也是必要的。我正在使用 flask-sqlalchemy。
希望这可以对面临类似问题的人有所帮助或提供一些见解:)
def save_obj(obj, checked=[]):
if obj in checked:
# if the object was already converted, retrieve the right object
model = get_model(obj.__mapper__.mapped_table.name)
return get_obj(obj, model)
checked.append(obj)
children = []
relations = obj.__mapper__.relationships.items()
# get all the relationships of this model (foreign keys)
for relation in relations:
model = get_model(relation[1].table.name)
if model:
# remove the cascade option for this object, so the children are not stored automatically in the session
relation[1]._cascade = CascadeOptions('')
child = getattr(obj, relation[0])
if not child:
continue
# if the child is a set of children
if isinstance(child, list):
new_children = []
for ch in copy(child):
# convert the child
new_child = save_obj(ch, checked)
new_children.append(new_child)
children.append((relation[0], new_children))
else:
new_child = save_obj(child, checked)
children.append((relation[0], new_child))
# get the model of the object passed
model = get_model(obj.__mapper__.mapped_table.name)
new_obj = get_obj(obj, model)
# set all the children in this object
for child in children:
if child[1]:
setattr(new_obj, child[0], child[1])
checked.append(new_obj)
session.add(new_obj)
return new_obj
def get_model(table_name):
# get the right model for this object
for table in db.Model._decl_class_registry.values():
if hasattr(table, '__tablename__') and table.__tablename__ == table_name:
return table
return None
def create_new_obj(obj, model):
params = obj.__dict__.copy()
params.pop('_sa_instance_state')
return model(**params)
def get_obj(child, model):
# check if the object is already stored in the db
child_in_db = session.query(model).get(child.id)
if child_in_db:
return child_in_db
# check if the object is already in the session
for s in session.new:
if type(s) == model and s.id == child.id:
return s
return create_new_obj(child, model)
我有两个具有完全相同 table 的数据库(均为 Mysql),我想使用 Sqlalchemy 将一些数据从一个数据库复制到另一个数据库。
我可以按照这个问题给出的答案复制简单的对象:
Cannot move object from one database to another
问题是当对象具有来自另一个 table 的依赖项时,我也想复制这些依赖项。
所以为了更清楚,这是我的模型(两个数据库相同,但使用指向不同数据库的不同 bind_key):
db1 = SQLAlchemy()
Class Payment(db.Model):
__tablename__ = 'payments'
__bind_key__ = 'db1'
id = db1.Column(db.Integer, primary_key=True)
paymethod_id = db1.Column(db.Integer(), db1.ForeignKey(PaymentMethod.id))
payment_method = db1.relationship(PaymentMethod)
我想做的是:
from models1 import Payment as Payment1
from models2 import Payment as Payment2
# query from one database
payment1 = db1.session.query(Payment1).first()
# create and add it to the other database
payment2 = Payment2(**payment1.__dict__.copy())
db2.session.add(payment)
db2.session.commit()
但在这种情况下外键失败,因为我还没有存储 PaymentMethod。
是否有不同的方法来做到这一点,或者我必须为我的对象的每个依赖项执行此过程并确保我事先存储子项?
感谢任何帮助:)
我想出了一个解决方案,将对象重新映射到正确的模型并存储其所有子对象。您调用方法 save_obj
并传递要映射的对象。然后它将检索具有相同名称的 table 但随后从您要将对象重新映射到的模型中检索,并且它将递归地对其所有子对象执行相同的操作。您必须在方法 get_model
.
至运行 禁用自动刷新以防止在对象正确形成之前提交是必要的,并且在调用该方法之后也是必要的。我正在使用 flask-sqlalchemy。
希望这可以对面临类似问题的人有所帮助或提供一些见解:)
def save_obj(obj, checked=[]):
if obj in checked:
# if the object was already converted, retrieve the right object
model = get_model(obj.__mapper__.mapped_table.name)
return get_obj(obj, model)
checked.append(obj)
children = []
relations = obj.__mapper__.relationships.items()
# get all the relationships of this model (foreign keys)
for relation in relations:
model = get_model(relation[1].table.name)
if model:
# remove the cascade option for this object, so the children are not stored automatically in the session
relation[1]._cascade = CascadeOptions('')
child = getattr(obj, relation[0])
if not child:
continue
# if the child is a set of children
if isinstance(child, list):
new_children = []
for ch in copy(child):
# convert the child
new_child = save_obj(ch, checked)
new_children.append(new_child)
children.append((relation[0], new_children))
else:
new_child = save_obj(child, checked)
children.append((relation[0], new_child))
# get the model of the object passed
model = get_model(obj.__mapper__.mapped_table.name)
new_obj = get_obj(obj, model)
# set all the children in this object
for child in children:
if child[1]:
setattr(new_obj, child[0], child[1])
checked.append(new_obj)
session.add(new_obj)
return new_obj
def get_model(table_name):
# get the right model for this object
for table in db.Model._decl_class_registry.values():
if hasattr(table, '__tablename__') and table.__tablename__ == table_name:
return table
return None
def create_new_obj(obj, model):
params = obj.__dict__.copy()
params.pop('_sa_instance_state')
return model(**params)
def get_obj(child, model):
# check if the object is already stored in the db
child_in_db = session.query(model).get(child.id)
if child_in_db:
return child_in_db
# check if the object is already in the session
for s in session.new:
if type(s) == model and s.id == child.id:
return s
return create_new_obj(child, model)