Python - Sqlalchemy:如何从多对多关系中获取记录列表
Python - Sqlalchemy: How to get list of the records from many-to-many relationship
我有如下数据库:
+-----+ +------+ +-------+ +-------------+
|Users|-----|Emails|-----|Assoc_T|------|Other_T |
| |1 m| |1 m| |m 1| |
|other| |other | |other | |types |
|data | |data | |data | |other data |
+-----+ +------+ +-------+ +-------------+
简短描述:我有一个用户,可能有很多电子邮件,并且该电子邮件与 Other_T
存在多对多关系
如果我在 SQL 炼金术中有用户对象:
user = UserModel.query.join(UserModel.Emails).filter_by(Email=id).first()
如何获得当前用户的 Other_T
的唯一列表?
我使用下面的方法,但它看起来不正确,即使它确实有效。 (太多的嵌套循环和对数据库的查询)*除非你告诉我必须这样做......
class User(object):
__DBModel = UserModel()
@property
def email(self):
return self.__DBModel.Emails if self.__DBModel else None
def __init__(self, id=None):
if Helpers.is_email(str(id)):
self.__DBModel = UserModel.query.join(UserModel.Emails).filter_by(Email=id).first() if id else UserModel()
elif Helpers.is_number(str(id)):
pass
# THE QUESTION IS HERE: how to get list of OtherT record based on UserModel defined in __init__?
def get_OtherT(self, email=None, other_types=None):
# get list of email that the user have
emails = []
if not email:
emails = self.email.all()
else:
if Helpers.is_email(str(email)):
emails.append(user.email.filter_by(Email=email).first())
else:
return False
# get list of Other_T_ID in Assoc Table
O_T_ID = []
for e in emails:
assoc_other_t = e.EmailAssociations
for assoc in assoc_other_t:
if assoc.Other_T_ID not in O_T_ID:
O_T_ID .append(assoc.Other_T_ID)
# now, after i have the list of the Other_T ID, get the actual Other_T
ret = []
for o in O_T_ID :
ret.append(Other_TModel.query.filter_by(Other_T_ID=o, types=other_types).first()
return ret
这是我的sql炼金术模型:
用户模型
class UserModel(db.Model):
__tablename__ = "Users"
UserID = db.Column(db.Integer, primary_key=True, autoincrement=True)
FirstName = db.Column(db.String(255), nullable=False)
LastName = db.Column(db.String(255), nullable=False)
# relationships
Emails = db.relationship('EmailModel', backref='user', lazy='dynamic')
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
EmailModel
class EmailModel(db.Model):
__tablename__ = "Emails"
Email = db.Column(db.String(255), unique=True, primary_key=True, nullable=False)
UserID = db.Column(db.Integer, db.ForeignKey('users.UserID'), nullable=False)
# relationships
EmailAssociations = db.relationship("Assoc_TModel", back_populates="Emails")
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
Assoc_TModel
class Assoc_TModel(db.Model):
__tablename__ = 'Assoc_T'
Other_T_ID = db.Column(
db.Integer, db.ForeignKey('Other_T.Other_T_ID'),
primary_key=True, nullable=False
)
Email = db.Column(
db.String(255), db.ForeignKey('Emails.Email'),
primary_key=True, nullable=False
)
EmailVerified = db.Column(db.Boolean, nullable=False, server_default='0')
# relationships
Emails = db.relationship("EmailModel", back_populates="EmailAssociations")
Other_Ts = db.relationship("Other_TModel", back_populates="Other_TAssociations")
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
Other_T型号
class Other_TModel(db.Model):
__tablename__ = "Other_T"
Other_T_ID = db.Column(db.Integer, nullable=False, unique=True, primary_key=True, autoincrement=True)
Other_T_Type = db.Column(db.Enum('one', 'two', 'three'), nullable=False, server_default='one')
Other_Data= db.Column(db.String(255), nullable=False)
# relationships
Other_TAssociations= db.relationship("Assoc_TModel", back_populates="Other_Ts")
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
谢谢!
您的实施导致过度获取您不需要的数据。
emails = self.email.all()
会将所有电子邮件实体加载到内存中。
assoc_other_t = e.EmailAssociations
将为每个 EmailAssociations
触发额外的 SQL 查询,这将极大地影响您的性能。
您可以使用 subquery
来避免中间提取。
class User(object):
def get_OtherT(self, email=None, other_types=None):
if email and not Helpers.is_email(email):
# I'd recommend an exception here
return False
# Assoc_TModel subquery
# SELECT Other_T_ID FROM Other_T_ID
assoc_sq = session.query(Assoc_TModel.Other_T_ID)
# Handle email predicate
if email:
# Specific email
assoc_sq = assoc_sq.filter(Assoc_TModel.Email == email)
else:
# All emails associated with current user
email_sq = session.query(EmailModel.Email).\
filter(EmailModel.UserID == self.__DBModel.UserID)
assoc_sq = assoc_sq.filter(Assoc_TModel.Email.in_(email_sq))
# Fetch Other_TModel
q = session.query(Other_TModel).\
filter(Other_TModel.Other_T_ID.in_(assoc_sq))
if other_types:
# unclear `other_types` is a list?
q = q.filter(Other_TModel.other_types.in_(other_types))
# or is a scalar value?
q = q.filter(Other_TModel.other_types == other_types)
return q.all()
我有如下数据库:
+-----+ +------+ +-------+ +-------------+
|Users|-----|Emails|-----|Assoc_T|------|Other_T |
| |1 m| |1 m| |m 1| |
|other| |other | |other | |types |
|data | |data | |data | |other data |
+-----+ +------+ +-------+ +-------------+
简短描述:我有一个用户,可能有很多电子邮件,并且该电子邮件与 Other_T
存在多对多关系如果我在 SQL 炼金术中有用户对象:
user = UserModel.query.join(UserModel.Emails).filter_by(Email=id).first()
如何获得当前用户的 Other_T
的唯一列表?
我使用下面的方法,但它看起来不正确,即使它确实有效。 (太多的嵌套循环和对数据库的查询)*除非你告诉我必须这样做......
class User(object):
__DBModel = UserModel()
@property
def email(self):
return self.__DBModel.Emails if self.__DBModel else None
def __init__(self, id=None):
if Helpers.is_email(str(id)):
self.__DBModel = UserModel.query.join(UserModel.Emails).filter_by(Email=id).first() if id else UserModel()
elif Helpers.is_number(str(id)):
pass
# THE QUESTION IS HERE: how to get list of OtherT record based on UserModel defined in __init__?
def get_OtherT(self, email=None, other_types=None):
# get list of email that the user have
emails = []
if not email:
emails = self.email.all()
else:
if Helpers.is_email(str(email)):
emails.append(user.email.filter_by(Email=email).first())
else:
return False
# get list of Other_T_ID in Assoc Table
O_T_ID = []
for e in emails:
assoc_other_t = e.EmailAssociations
for assoc in assoc_other_t:
if assoc.Other_T_ID not in O_T_ID:
O_T_ID .append(assoc.Other_T_ID)
# now, after i have the list of the Other_T ID, get the actual Other_T
ret = []
for o in O_T_ID :
ret.append(Other_TModel.query.filter_by(Other_T_ID=o, types=other_types).first()
return ret
这是我的sql炼金术模型:
用户模型
class UserModel(db.Model):
__tablename__ = "Users"
UserID = db.Column(db.Integer, primary_key=True, autoincrement=True)
FirstName = db.Column(db.String(255), nullable=False)
LastName = db.Column(db.String(255), nullable=False)
# relationships
Emails = db.relationship('EmailModel', backref='user', lazy='dynamic')
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
EmailModel
class EmailModel(db.Model):
__tablename__ = "Emails"
Email = db.Column(db.String(255), unique=True, primary_key=True, nullable=False)
UserID = db.Column(db.Integer, db.ForeignKey('users.UserID'), nullable=False)
# relationships
EmailAssociations = db.relationship("Assoc_TModel", back_populates="Emails")
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
Assoc_TModel
class Assoc_TModel(db.Model):
__tablename__ = 'Assoc_T'
Other_T_ID = db.Column(
db.Integer, db.ForeignKey('Other_T.Other_T_ID'),
primary_key=True, nullable=False
)
Email = db.Column(
db.String(255), db.ForeignKey('Emails.Email'),
primary_key=True, nullable=False
)
EmailVerified = db.Column(db.Boolean, nullable=False, server_default='0')
# relationships
Emails = db.relationship("EmailModel", back_populates="EmailAssociations")
Other_Ts = db.relationship("Other_TModel", back_populates="Other_TAssociations")
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
Other_T型号
class Other_TModel(db.Model):
__tablename__ = "Other_T"
Other_T_ID = db.Column(db.Integer, nullable=False, unique=True, primary_key=True, autoincrement=True)
Other_T_Type = db.Column(db.Enum('one', 'two', 'three'), nullable=False, server_default='one')
Other_Data= db.Column(db.String(255), nullable=False)
# relationships
Other_TAssociations= db.relationship("Assoc_TModel", back_populates="Other_Ts")
@orm.reconstructor
def init_on_load(self):
pass
def __init__(self):
pass
谢谢!
您的实施导致过度获取您不需要的数据。
emails = self.email.all()
会将所有电子邮件实体加载到内存中。
assoc_other_t = e.EmailAssociations
将为每个 EmailAssociations
触发额外的 SQL 查询,这将极大地影响您的性能。
您可以使用 subquery
来避免中间提取。
class User(object):
def get_OtherT(self, email=None, other_types=None):
if email and not Helpers.is_email(email):
# I'd recommend an exception here
return False
# Assoc_TModel subquery
# SELECT Other_T_ID FROM Other_T_ID
assoc_sq = session.query(Assoc_TModel.Other_T_ID)
# Handle email predicate
if email:
# Specific email
assoc_sq = assoc_sq.filter(Assoc_TModel.Email == email)
else:
# All emails associated with current user
email_sq = session.query(EmailModel.Email).\
filter(EmailModel.UserID == self.__DBModel.UserID)
assoc_sq = assoc_sq.filter(Assoc_TModel.Email.in_(email_sq))
# Fetch Other_TModel
q = session.query(Other_TModel).\
filter(Other_TModel.Other_T_ID.in_(assoc_sq))
if other_types:
# unclear `other_types` is a list?
q = q.filter(Other_TModel.other_types.in_(other_types))
# or is a scalar value?
q = q.filter(Other_TModel.other_types == other_types)
return q.all()