SQLAlchemy:select 具有复合主键的单个 table 中所有 ID 的最新行
SQLAlchemy: select most recent row for all ids in a single table with composite primary key
我想做 但在
SQL炼金术。唯一的区别是,我不仅希望能够获得最新记录,还希望能够在
给定的时间戳。只要我确保行永远不会被删除,这就允许我查看特定时间戳上的数据库。
假设我的模型是这样的:
from datetime import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative include declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id_ = Column("id", Integer, primary_key=True, index=True, nullable=False)
timestamp = Column(DateTime, primary_key=True, index=True, nullable=False, default=datetime.utcnow())
# other non-primary attributes would go here
我有这个 users
table(时间戳简化):
| id_ | timestamp |
-------------------
0 1
0 4
0 6
1 3
2 7
2 3
例如,如果我在 timestamp = 4
请求快照,我想得到:
| id_ | timestamp |
-------------------
0 4
1 3
2 3
我能想到的最好的方法是按程序进行:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
db_engine = create_engine(...)
SessionLocal = sessionmaker(bind=db_engine, ...)
db_session = SessionLocal()
def get_snapshot(timestamp: datetime):
all_versions = db_session.query(User).filter(User.timestamp <= timestamp).order_by(desc(User.timestamp))
snapshot = []
for v in all_versions:
if v.id_ not in (i.id_ for i in snapshots):
snapshot.append(v)
return snapshot
但是,这给了我一个模型对象列表而不是 sqlalchemy.orm.query.Query
,所以我必须以不同于标准查询的方式处理结果
我的代码的其他部分。这些都可以在 ORM 中完成吗?
提前致谢
你试过了吗:
all_versions = db_session.query(User, func.max(User.timestamp)).\
filter(User.timestamp <= timestamp).\
group_by(User.id_)
您可以在 SQLAlchemy 中阅读有关泛型函数的更多信息here
Matteo 解决方案的替代方法是使用子查询并将其连接到 table,这会以我喜欢的 sqlalchemy.orm.query.Query
对象格式给出结果。感谢 Matteo 的子查询代码:
subq = db_session.query(User.id_, func.max(User.timestamp).label("maxtimestamp")).filter(User.timestamp < timestamp).group_by(User.id_).subquery()
q = db_session.query(User).join(subq, and_(User.id_ == subq.c.id, User.timestamp == subq.c.maxtimestamp))
SQL代
请注意,这可能比 Matteo 的解决方案效率低:
SQL 由子查询生成
SELECT users.id AS users_id, users.timestamp AS users_timestamp, users.name AS users_name, users.notes AS users_notes, users.active AS users_active
FROM users JOIN (SELECT users.id AS id, max(users.timestamp) AS maxtimestamp
FROM users
WHERE users.timestamp < ? GROUP BY users.id) AS anon_1 ON users.id = anon_1.id AND users.timestamp = anon_1.maxtimestamp
SQL 由 Matteo 的解决方案生成:
SELECT users.id AS users_id, users.timestamp AS users_timestamp, users.name AS users_name, users.notes AS users_notes, users.active AS users_active, max(users.timestamp) AS max_1
FROM users
WHERE users.timestamp <= ? GROUP BY users.id
此回答的前一内容
@Matteo Di Napoli
谢谢,您的 post 或多或少是我所需要的。它的输出是一个 sqlalchemy.util._collections.result
,从我所见,它的行为就像一个元组。在我的应用程序中,我需要完整的 User
对象,而不仅仅是 id / timestamp 对,所以更适合我的是:
from sqlalchemy import func
all_versions = db_session.query(User, func.max(User.timestamp)).\
filter(User.timestamp <= timestamp).\
group_by(User.id_)
返回如下内容:
> for i in all_versions: print(i)
...
(<User "my test user v2", id 0, modified 2019-06-19 14:42:16.380381>, datetime.datetime(2019, 6, 19, 14, 42, 16, 380381))
(<User "v2", id 1, modified 2019-06-19 15:53:53.147039>, datetime.datetime(2019, 6, 19, 15, 53, 53, 147039))
(<User "a user", id 2, modified 2019-06-20 12:34:56>, datetime.datetime(2019, 6, 20, 12, 34, 56))
然后我可以使用 all_versions[n][0]
访问用户对象或使用 l = [i[0] for i in all_versions]
获取列表(感谢 Matteo Di Napoli 提供了更好的语法)。
完美的最终结果是,如果我能得到一个仍然是 sqlalchemy.orm.query.Query
(如 all_versions
)的结果,但每个项目都是一个 User
对象而不是 sqlalchemy.util._collections.result
。这可能吗?
我想做
假设我的模型是这样的:
from datetime import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative include declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id_ = Column("id", Integer, primary_key=True, index=True, nullable=False)
timestamp = Column(DateTime, primary_key=True, index=True, nullable=False, default=datetime.utcnow())
# other non-primary attributes would go here
我有这个 users
table(时间戳简化):
| id_ | timestamp |
-------------------
0 1
0 4
0 6
1 3
2 7
2 3
例如,如果我在 timestamp = 4
请求快照,我想得到:
| id_ | timestamp |
-------------------
0 4
1 3
2 3
我能想到的最好的方法是按程序进行:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
db_engine = create_engine(...)
SessionLocal = sessionmaker(bind=db_engine, ...)
db_session = SessionLocal()
def get_snapshot(timestamp: datetime):
all_versions = db_session.query(User).filter(User.timestamp <= timestamp).order_by(desc(User.timestamp))
snapshot = []
for v in all_versions:
if v.id_ not in (i.id_ for i in snapshots):
snapshot.append(v)
return snapshot
但是,这给了我一个模型对象列表而不是 sqlalchemy.orm.query.Query
,所以我必须以不同于标准查询的方式处理结果
我的代码的其他部分。这些都可以在 ORM 中完成吗?
提前致谢
你试过了吗:
all_versions = db_session.query(User, func.max(User.timestamp)).\
filter(User.timestamp <= timestamp).\
group_by(User.id_)
您可以在 SQLAlchemy 中阅读有关泛型函数的更多信息here
Matteo 解决方案的替代方法是使用子查询并将其连接到 table,这会以我喜欢的 sqlalchemy.orm.query.Query
对象格式给出结果。感谢 Matteo 的子查询代码:
subq = db_session.query(User.id_, func.max(User.timestamp).label("maxtimestamp")).filter(User.timestamp < timestamp).group_by(User.id_).subquery()
q = db_session.query(User).join(subq, and_(User.id_ == subq.c.id, User.timestamp == subq.c.maxtimestamp))
SQL代
请注意,这可能比 Matteo 的解决方案效率低:
SQL 由子查询生成
SELECT users.id AS users_id, users.timestamp AS users_timestamp, users.name AS users_name, users.notes AS users_notes, users.active AS users_active
FROM users JOIN (SELECT users.id AS id, max(users.timestamp) AS maxtimestamp
FROM users
WHERE users.timestamp < ? GROUP BY users.id) AS anon_1 ON users.id = anon_1.id AND users.timestamp = anon_1.maxtimestamp
SQL 由 Matteo 的解决方案生成:
SELECT users.id AS users_id, users.timestamp AS users_timestamp, users.name AS users_name, users.notes AS users_notes, users.active AS users_active, max(users.timestamp) AS max_1
FROM users
WHERE users.timestamp <= ? GROUP BY users.id
此回答的前一内容
@Matteo Di Napoli
谢谢,您的 post 或多或少是我所需要的。它的输出是一个 sqlalchemy.util._collections.result
,从我所见,它的行为就像一个元组。在我的应用程序中,我需要完整的 User
对象,而不仅仅是 id / timestamp 对,所以更适合我的是:
from sqlalchemy import func
all_versions = db_session.query(User, func.max(User.timestamp)).\
filter(User.timestamp <= timestamp).\
group_by(User.id_)
返回如下内容:
> for i in all_versions: print(i)
...
(<User "my test user v2", id 0, modified 2019-06-19 14:42:16.380381>, datetime.datetime(2019, 6, 19, 14, 42, 16, 380381))
(<User "v2", id 1, modified 2019-06-19 15:53:53.147039>, datetime.datetime(2019, 6, 19, 15, 53, 53, 147039))
(<User "a user", id 2, modified 2019-06-20 12:34:56>, datetime.datetime(2019, 6, 20, 12, 34, 56))
然后我可以使用 all_versions[n][0]
访问用户对象或使用 l = [i[0] for i in all_versions]
获取列表(感谢 Matteo Di Napoli 提供了更好的语法)。
完美的最终结果是,如果我能得到一个仍然是 sqlalchemy.orm.query.Query
(如 all_versions
)的结果,但每个项目都是一个 User
对象而不是 sqlalchemy.util._collections.result
。这可能吗?