SQLAlchemy 查询构造

SQLAlchemy query construction

我正在使用一个由 Postgres 支持的 Key 模型,它是一个通用的 table 来保存 API 键:

class Key(Model):
    __tablename__ = "keys"
    id = Column(Integer, primarykey=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    brokerage_id = Column(Integer, ForeignKey("brokerages.id"))
    account_id = Column(Integer, ForeignKey("accounts.id"))
    key = Column(String(128))
    value = Column(String(128))

在下面的例子中,user 2 有三个键。这三个都与 brokerage 2account 2 相关联。这由 ID 4 到 6 表示。对于此站点,用户有一个身份验证令牌和两个查询 ID。

id  user_id brokerage_id    account_id      key         value
--------------------------------------------------------------------
4   2       2               2               token       999999999999
5   2       2               2               query_id    888888      
6   2       2               2               query_id    777777      
7   1       2               3               token       444444444444

我正在尝试构建一个查询,这样我的结果将被建模为:

[(user_id, brokerage_id, account_id, token, [query_id_1, query_id_2, ...]), ...]

所以对于上面的例子,它看起来像这样

[(2, 2, 2, 999999999999, [888888, 777777]), (1, 2, 3, 444444444444, [])]

我有以下查询,其中 select 令牌和 query_ids

tokens = db.session.query(
    Key.user_id, Key.brokerage_id, Key.account_id, Key.value
).filter(Key.key=='token').all()

query_ids = db.session.query(
    Key.user_id, Key.brokerage_id, Key.account_id, Key.value
).filter(Key.key=='query_id').all()

我尝试过以各种方式使用 subquery,但无法完全获得与我需要的类似的输出。我如何构建一个查询 return 结果以与上面的元组列表对齐的方式?

结果

感谢@rfkortekaas

,在这里添加最终的工作查询
from sqlalchemy.orm import aliased
from sqlalchemy import func, and_

from project.models import Key
from project.extensions import db

key_token = aliased(Key)

q = db.session.query(
    key_token.user_id,
    key_token.brokerage_id,
    key_token.account_id,
    key_token.value.label('token'),
    func.array_agg(Key.value).label('query_ids')
).join(
    Key,
    and_(
        key_token.user_id == Key.user_id,
        key_token.brokerage_id == Key.brokerage_id,
        key_token.account_id == Key.account_id,
        Key.key == 'query_id'
    )
).filter(
    key_token.key == 'token'
).group_by(
    key_token.user_id,
    key_token.brokerage_id,
    key_token.account_id,
    key_token.value
)
results = q.all()

您可以使用 PostgreSQL 中的 array_agg 函数来创建结果数组:

from sqlalchemy.orm import aliased

key_token = aliased(Key)

stmt = select(key_token.user_id,
              key_token.brokerage_id,
              key_token.account_id,
              key_token.value.label('token'),
              func.array_agg(Key.value).label('query_ids')
              ).join(Key,
                     and_(key_token.user_id == Key.user_id,
                          key_token.brokerage_id == Key.brokerage_id,
                          key_token.account_id == Key.account_id,
                          Key.key == 'query_id'))\
               .where(key_token.key == 'token')\
               .group_by(key_token.user_id,
                         key_token.brokerage_id,
                         key_token.account_id,
                         key_token.value)
keys = session.execute(stmt).all()
for row in keys:
    print(row)

结果:

user_id brokerage_id account_id token query_ids
1 2 3 '44' ['4']
2 2 1 '33" ['6']
2 2 2 '99" ['8', '7]

对于以下数据集:

user_id brokerage_id account_id key value
2 2 2 token '99'
2 2 1 token '33'
2 2 1 query_id '6'
2 2 2 query_id '8'
2 2 2 query_id '7'
1 2 3 token '44'
1 2 3 query_id '4'