SQLAlchemy:select 具有复合主键的单个 table 中所有 ID 的最新行

SQLAlchemy: select most recent row for all ids in a single table with composite primary key

我想做 但在 SQL炼金术。唯一的区别是,我不仅希望能够获得最新记录,还希望能够在 给定的时间戳。只要我确保行永远不会被删除,这就允许我查看特定时间戳上的数据库。

假设我的模型是这样的:

from datetime import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative include declarative_base
Base = declarative_base()
class User(Base):
    __tablename__ = "users"
    id_ = Column("id", Integer, primary_key=True, index=True, nullable=False)
    timestamp = Column(DateTime, primary_key=True, index=True, nullable=False, default=datetime.utcnow())
    # other non-primary attributes would go here

我有这个 users table(时间戳简化):

| id_ | timestamp |
-------------------
  0     1
  0     4
  0     6
  1     3
  2     7
  2     3

例如,如果我在 timestamp = 4 请求快照,我想得到:

| id_ | timestamp |
-------------------
  0     4
  1     3
  2     3

我能想到的最好的方法是按程序进行:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
db_engine = create_engine(...)
SessionLocal = sessionmaker(bind=db_engine, ...)
db_session = SessionLocal()

def get_snapshot(timestamp: datetime):
    all_versions = db_session.query(User).filter(User.timestamp <= timestamp).order_by(desc(User.timestamp))
    snapshot = []
    for v in all_versions:
        if v.id_ not in (i.id_ for i in snapshots):
            snapshot.append(v)
    return snapshot

但是,这给了我一个模型对象列表而不是 sqlalchemy.orm.query.Query,所以我必须以不同于标准查询的方式处理结果 我的代码的其他部分。这些都可以在 ORM 中完成吗?

提前致谢

你试过了吗:

all_versions = db_session.query(User, func.max(User.timestamp)).\
               filter(User.timestamp <= timestamp).\
               group_by(User.id_)               

您可以在 SQLAlchemy 中阅读有关泛型函数的更多信息here

Matteo 解决方案的替代方法是使用子查询并将其连接到 table,这会以我喜欢的 sqlalchemy.orm.query.Query 对象格式给出结果。感谢 Matteo 的子查询代码:

subq = db_session.query(User.id_, func.max(User.timestamp).label("maxtimestamp")).filter(User.timestamp < timestamp).group_by(User.id_).subquery()
q = db_session.query(User).join(subq, and_(User.id_ == subq.c.id, User.timestamp == subq.c.maxtimestamp))

SQL代

请注意,这可能比 Matteo 的解决方案效率低:

SQL 由子查询生成

SELECT users.id AS users_id, users.timestamp AS users_timestamp, users.name AS users_name, users.notes AS users_notes, users.active AS users_active
FROM users JOIN (SELECT users.id AS id, max(users.timestamp) AS maxtimestamp
FROM users
WHERE users.timestamp < ? GROUP BY users.id) AS anon_1 ON users.id = anon_1.id AND users.timestamp = anon_1.maxtimestamp

SQL 由 Matteo 的解决方案生成:

SELECT users.id AS users_id, users.timestamp AS users_timestamp, users.name AS users_name, users.notes AS users_notes, users.active AS users_active, max(users.timestamp) AS max_1
FROM users
WHERE users.timestamp <= ? GROUP BY users.id

此回答的前一内容

@Matteo Di Napoli

谢谢,您的 post 或多或少是我所需要的。它的输出是一个 sqlalchemy.util._collections.result,从我所见,它的行为就像一个元组。在我的应用程序中,我需要完整的 User 对象,而不仅仅是 id / timestamp 对,所以更适合我的是:

from sqlalchemy import func 

all_versions = db_session.query(User, func.max(User.timestamp)).\
               filter(User.timestamp <= timestamp).\
               group_by(User.id_)

返回如下内容:

> for i in all_versions: print(i)
...
(<User "my test user v2", id 0, modified 2019-06-19 14:42:16.380381>, datetime.datetime(2019, 6, 19, 14, 42, 16, 380381))
(<User "v2", id 1, modified 2019-06-19 15:53:53.147039>, datetime.datetime(2019, 6, 19, 15, 53, 53, 147039))
(<User "a user", id 2, modified 2019-06-20 12:34:56>, datetime.datetime(2019, 6, 20, 12, 34, 56))

然后我可以使用 all_versions[n][0] 访问用户对象或使用 l = [i[0] for i in all_versions] 获取列表(感谢 Matteo Di Napoli 提供了更好的语法)。

完美的最终结果是,如果我能得到一个仍然是 sqlalchemy.orm.query.Query(如 all_versions)的结果,但每个项目都是一个 User 对象而不是 sqlalchemy.util._collections.result。这可能吗?