在初始查询 sqlalchemy 中限制子集合

Limit child collections in initial query sqlalchemy

我正在构建一个 api,如果用户请求,它可以 return 资源的子资源。例如,usermessages。我希望查询能够限制 returned.

message 个对象的数量

我发现了一个关于限制子集合中对象数量的有用提示 here。基本上,它表示以下流程:

class User(...):
    # ...
    messages = relationship('Messages', order_by='desc(Messages.date)', lazy='dynamic')

user = User.query.one()
users.messages.limit(10)

我的用例有时涉及 return 大量用户。

如果我遵循 link 中的建议并使用 .limit(),那么我将需要遍历对每个用户调用 .limit() 的整个用户集合。这比在创建集合的原始 sql 表达式中使用 LIMIT 效率低得多。

我的问题是,是否可以使用声明式有效地 (N+0) 加载大量对象,同时使用 sqlalchemy 限制其子集合中子集合的数量?

更新

明确地说,以下是我试图避免

users = User.query.all()
messages = {}
for user in users:
    messages[user.id] = user.messages.limit(10).all()

我想做更多类似的事情:

users = User.query.option(User.messages.limit(10)).all()

如果您应用限制然后对其调用 .all(),您将获取所有对象一次,它不会一个一个地获取对象,从而导致您提到的性能问题。

简单地应用限制并获取所有对象。

users = User.query.limit(50).all()
print(len(users))
>>50

或对于子对象/关系

user = User.query.one()
all_messages = user.messages.limit(10).all()


users = User.query.all()
messages = {}
for user in users:
    messages[user.id] = user.messages.limit(10).all()

因此,我认为您需要在第二个查询中加载消息,然后再以某种方式与您的用户关联。 以下是数据库相关的;由于 discussed in this question、mysql 不支持有限制的查询,但 sqlite 至少会解析查询。我没有看计划,看它是否做得很好。 下面的代码会找到你关心的所有消息对象。然后您需要将它们与用户相关联。
我已经对此进行了测试,以确认它生成了一个 sqlite 可以解析的查询;我还没有确认 sqlite 或任何其他数据库对这个查询做了正确的事情。 我不得不作弊并使用文本原语来引用 select 中的外部 user.id 列,因为 SQLAlchemy 一直想在内部 select 子查询中包含一个额外的用户连接。

from sqlalchemy import Column, Integer, String, ForeignKey, alias
from sqlalchemy.sql import text

from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key = True)
    name = Column(String)

class Message(Base):
    __tablename__ = 'messages'
    user_id = Column(Integer, ForeignKey(User.id), nullable = False)
    id = Column(Integer, primary_key = True)


s = Session()
m1 = alias(Message.__table__)

user_query = s.query(User) # add any user filtering you want
inner_query = s.query(m1.c.id).filter(m1.c.user_id == text('users.id')).limit(10)
all_messages_you_want = s.query(Message).join(User).filter(Message.id.in_(inner_query))

要将消息与用户相关联,您可以执行以下操作,假设您的消息具有用户关系并且您的用户对象具有一个 got_child_message 方法,可以执行您喜欢的任何操作

users_resulting = user_query.all() #load objects into session and hold a reference
for m in all_messages_you_want: m.user.got_child_message(m)

因为您已经在会话中拥有用户,并且因为该关系位于用户的主键上,所以 m.user 根据身份映射解析为 query.get。 我希望这可以帮助你到达某个地方。

此答案来自 sqlalchemy google group 上的 Mike Bayer。我将其发布在这里以帮助人们: TLDR: 我使用 Mike 的答案 version 1 来解决我的问题,因为在这种情况下,我没有涉及此关系的外键,因此无法使用 LATERAL。版本 1 效果很好,但一定要注意 offset 的效果。它在测试期间让我失望了一段时间,因为我没有注意到它被设置为 0 以外的其他设置。

版本 1 的代码块:

subq = s.query(Messages.date).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).\
    limit(1).offset(10).correlate(User).as_scalar()

q = s.query(User).join(
    Messages,
    and_(User.id == Messages.user_id, Messages.date > subq)
).options(contains_eager(User.messages))

迈克的回答 所以你应该忽略它是否使用"declarative",这与查询无关,实际上首先忽略Query,因为首先这是一个SQL问题。您需要一个 SQL 语句来执行此操作。 SQL 中的哪个查询将从主 table 加载大量行,并连接到每个主 table 的辅助 table 的前十行?

LIMIT 很棘手,因为它实际上不是通常 "relational algebra" 计算的一部分。它不在其中,因为它是对行的人为限制。例如,我对如何做到这一点的第一个想法是错误的:

    select * from users left outer join (select * from messages limit 10) as anon_1 on users.id = anon_1.user_id

这是错误的,因为它只获取聚合中的前十条消息,而忽略了用户。我们想要为每个用户获取前十条消息,这意味着我们需要为每个用户单独执行此操作 "select from messages limit 10"。也就是说,我们需要以某种方式进行关联。相关子查询虽然通常不允许作为 FROM 元素,而只允许作为 SQL 表达式,但它只能 return 单列单行;我们通常不能加入普通香草中的相关子查询 SQL。但是,我们可以在 JOIN 的 ON 子句内部进行关联,以在原版 SQL.

中实现这一点。

但首先,如果我们使用的是现代 Postgresql 版本,我们 可以 打破通常的相关规则并使用名为 LATERAL 的关键字,它允许在 FROM 子句中进行相关。 LATERAL 仅受现代 Postgresql 版本支持,它使这变得简单:

    select * from users left outer join lateral
    (select * from message where message.user_id = users.id order by messages.date desc limit 10) as anon1 on users.id = anon_1.user_id

我们支持 LATERAL 关键字。上面的查询如下所示:

subq = s.query(Messages).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).limit(10).subquery().lateral()

q = s.query(User).outerjoin(subq).\
     options(contains_eager(User.messages, alias=subq))

请注意,在上面,为了 SELECT 用户和消息并将它们生成到 User.messages 集合中,必须使用 "contains_eager()" 选项,为此 "dynamic" 必须离开。这不是唯一的选择,例如,您可以为没有 "dynamic" 的 User.messages 建立第二个关系,或者您可以单独从 query(User, Message) 加载并组织结果元组根据需要。

如果您不使用 Postgresql 或不支持 LATERAL 的 Postgresql 版本,则必须将相关性添加到连接的 ON 子句中。 SQL 看起来像:

select * from users left outer join messages on
users.id = messages.user_id and messages.date > (select date from messages where messages.user_id = users.id order by date desc limit 1 offset 10)

在这里,为了将 LIMIT 塞进去,我们实际上是使用 OFFSET 逐步执行前 10 行,然后执行 LIMIT 1 以获得表示我们想要的每个用户的下限日期的日期。然后我们必须在那个日期进行比较时加入,如果该列没有索引,这可能会很昂贵,如果有重复的日期,也可能不准确。

这个查询看起来像:

subq = s.query(Messages.date).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).\
    limit(1).offset(10).correlate(User).as_scalar()

q = s.query(User).join(
    Messages,
    and_(User.id == Messages.user_id, Messages.date >= subq)
).options(contains_eager(User.messages))

如果没有良好的测试,我不相信这些类型的查询,因此下面的 POC 包括两个版本,包括健全性检查。

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base()


class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    messages = relationship(
        'Messages', order_by='desc(Messages.date)')

class Messages(Base):
    __tablename__ = 'message'
    id = Column(Integer, primary_key=True)
    user_id = Column(ForeignKey('user.id'))
    date = Column(Date)

e = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)

s = Session(e)

s.add_all([
    User(id=i, messages=[
        Messages(id=(i * 20) + j, date=datetime.date(2017, 3, j))
        for j in range(1, 20)
    ]) for i in range(1, 51)
])

s.commit()

top_ten_dates = set(datetime.date(2017, 3, j) for j in range(10, 20))


def run_test(q):
    all_u = q.all()
    assert len(all_u) == 50
    for u in all_u:

        messages = u.messages
        assert len(messages) == 10

        for m in messages:
            assert m.user_id == u.id

        received = set(m.date for m in messages)

        assert received == top_ten_dates

# version 1.   no LATERAL

s.close()

subq = s.query(Messages.date).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).\
    limit(1).offset(10).correlate(User).as_scalar()

q = s.query(User).join(
    Messages,
    and_(User.id == Messages.user_id, Messages.date > subq)
).options(contains_eager(User.messages))

run_test(q)

# version 2.  LATERAL

s.close()

subq = s.query(Messages).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).limit(10).subquery().lateral()

q = s.query(User).outerjoin(subq).\
    options(contains_eager(User.messages, alias=subq))

run_test(q)

@melchoirs 的回答是最好的。我基本上把这个放在这里是为了将来的自己

我试过上面提到的答案,它有效,我更需要它来限制关联的数量 returned 在传递到 Marshmallow Serializer 之前。

需要说明的一些问题:

  • 子查询按关联运行,因此它找到相应的 date 以正确地作为基础
  • 考虑 limit/offset 给我从下一个 X(偏移量)开始的 1(限制)记录。因此,第 X 个最旧的记录是什么,然后在主查询中它会返回所有内容。太聪明了
  • 看来,如果关联少于 X 条记录,则 return 没什么,因为偏移量超过了记录,此后主查询不会 return 一条记录。

以上述为模板,我得出了以下答案。最初的 query/count 守卫是由于如果关联记录小于偏移量,则找不到任何内容的问题。此外,我还需要在没有关联的情况下添加一个外部连接。

最后,我发现这个查询有点ORM巫术,不想走那条路。我改为从设备序列化程序中排除 histories,并需要使用 device ID 进行第二次 history 查找。该集合可以分页并使一切更清晰。

这两种方法都有效,只是 why 您需要执行一个查询而不是多个查询。在上面,可能有商业原因需要通过单个查询更有效地取回所有内容。对于我的用例,可读性和约定胜过巫术

@classmethod
    def get_limited_histories(cls, uuid, limit=10):

        count = DeviceHistory.query.filter(DeviceHistory.device_id == uuid).count()

        if count > limit:
            sq = db.session.query(DeviceHistory.created_at) \
                .filter(DeviceHistory.device_id == Device.uuid) \
                .order_by(DeviceHistory.created_at.desc()) \
                .limit(1).offset(limit).correlate(Device)


        return db.session.query(Device).filter(Device.uuid == uuid) \
                .outerjoin(DeviceHistory,
                    and_(DeviceHistory.device_id == Device.uuid, DeviceHistory.created_at > sq)) \
                .options(contains_eager(Device.device_histories)).all()[0]

然后它的行为类似于 Device.query.get(id)Device.get_limited_histories(id)

  • 享受