SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库

SQLAlchemy Asyncio ORM Unable to Query Database When Retrieving Tables and Columns from MetaData

使用 SQLAlchemy 异步 ORM 1.4,Postgres 后端,Python3.7

我在 SA ORM 中使用 augmented Declarative Base。 table 不保存在 models.py 中,而是通过解析包含所有 table 模式的 JSON 脚本直接提交到数据库。因此,我无法在脚本顶部导入模型,如 from models import ThisTable.

因此,为了在 table 上进行 CRUD 操作,我首先通过反映元数据来检索它们。

在 'usual' 方式中,在脚本顶部导入所有 table 时,这样的查询有效:

result = await s.execute(select(func.sum(TableName.column)))
curr = result.all()

当我试图反映元数据中的 table 和列对象以便查询它们时,这不起作用。有很多 AttributeError: 'Table' object has no attribute 'func'TypeError: 'Table' object is not callable 错误。


def retrieve_table_obj(table):
    meta = MetaData()
    meta.reflect(bind=sync_engine)
    return meta.tables[table]

def retrieve_table_cols(self, table):
    table = retrieve_table_obj('users')
    return table.columns.keys()

async def reading(collection, modifications):

    table = db.retrieve_table_obj(collection)
    columns = db.retrieve_table_cols(collection)
    for c in columns:
        for f in mods['fields']:
            if c in f:
                q = select(func.sum(table.c))

result = await s.execute(q)
curr = result.all()

asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))

当首先必须显式检索时,如何查询数据库中的 table 和列?

automap 扩展可用于自动将数据库表反映到 SQLAlchemy 模型。但是 automap 在引擎上使用 inspect,这在异步引擎上不受支持;我们可以通过使用同步引擎进行自动映射来解决这个问题。一旦模型被映射,它们就可以被异步引擎使用。

例如:

import asyncio

import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.automap import automap_base


sync_engine = sa.create_engine('postgresql:///test', echo=True, future=True)

Base = automap_base()
Base.prepare(sync_engine, reflect=True)


async def async_main(collection, modifications):
    engine = create_async_engine(
        "postgresql+asyncpg:///test",
        echo=True,
        future=True,
        connect_args={'ssl': False},
    )

    async_session = orm.sessionmaker(
        engine, class_=AsyncSession, future=True
    )

    async with async_session() as session:
        model = Base.classes[collection]
        matches = set(model.__mapper__.columns.keys()) & set(modifications['fields'])
        for m in matches:
            q = sa.select(sa.func.sum(getattr(model, m)))


            result = await session.execute(q)
            curr = result.all()
            for row in curr:
                print(row)
            print()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))

如果您不需要模型,缓存 MetaData 对象而不是在每次调用 retrieve_table_obj 时重新创建它将使现有代码更高效,并将 select(func.sum(table.c)) 替换为select(sa.func.sum(getattr(table.c, c)))