SQLAlchemy 异步事件监听 engine.sync_engine SET 字符串参数替换

SQLAlchemy async event listen engine.sync_engine SET string parameter replacement

我正在尝试监听 engine_connect 事件,将光标放在该连接上,然后发出 SET 查询,但我似乎无法使参数替换起作用。我只有使用 from sqlalchemy.dialects.postgresql.asyncpg import AsyncAdapt_asyncpg_cursor 作为光标的异步引擎有这个问题。 请注意,我可以在 SELECT 查询中使用参数替换,但不能在 SET 查询中使用。

这是错误:

sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: syntax error at or near ""

这里是要重现的代码片段

import asyncio

from sqlalchemy import event
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

db_url = make_url('postgresql://xxx:yyy@127.0.0.1/my_db')

async_db_url = db_url.set(drivername="postgresql+asyncpg")
engine = create_async_engine(
    async_db_url,
    pool_timeout=5,
    pool_pre_ping=True,
    pool_recycle=600,
    future=True,
)

async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False, future=True,)


def foobar_engine_connect_listener(connection, branch):
    identity = "arn:aws:sts::123456:assumed-role/thing_1"
    cursor = connection.connection.cursor()
    
    # THIS WORKS 
    cursor.execute(
        "select %s", (identity,)
    )
    
    # THIS THROWS THE ERROR
    cursor.execute(
        "set session iam.identity = %s", (identity,)
    )


event.listens_for(engine.sync_engine, "engine_connect")(foobar_engine_connect_listener)


async def go():
    async with async_session() as session:
        await session.execute('select 1')


if __name__ == "__main__":
    asyncio.run(go())

解决方案最终是 set_config,参数格式为 %s:

cursor.execute("select set_config('iam.identity', %s, false)", (identity,))