SQLAlchemy 异步事件监听 engine.sync_engine SET 字符串参数替换
SQLAlchemy async event listen engine.sync_engine SET string parameter replacement
我正在尝试监听 engine_connect
事件,将光标放在该连接上,然后发出 SET
查询,但我似乎无法使参数替换起作用。我只有使用 from sqlalchemy.dialects.postgresql.asyncpg import AsyncAdapt_asyncpg_cursor
作为光标的异步引擎有这个问题。
请注意,我可以在 SELECT 查询中使用参数替换,但不能在 SET 查询中使用。
这是错误:
sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: syntax error at or near ""
这里是要重现的代码片段
import asyncio
from sqlalchemy import event
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
db_url = make_url('postgresql://xxx:yyy@127.0.0.1/my_db')
async_db_url = db_url.set(drivername="postgresql+asyncpg")
engine = create_async_engine(
async_db_url,
pool_timeout=5,
pool_pre_ping=True,
pool_recycle=600,
future=True,
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False, future=True,)
def foobar_engine_connect_listener(connection, branch):
identity = "arn:aws:sts::123456:assumed-role/thing_1"
cursor = connection.connection.cursor()
# THIS WORKS
cursor.execute(
"select %s", (identity,)
)
# THIS THROWS THE ERROR
cursor.execute(
"set session iam.identity = %s", (identity,)
)
event.listens_for(engine.sync_engine, "engine_connect")(foobar_engine_connect_listener)
async def go():
async with async_session() as session:
await session.execute('select 1')
if __name__ == "__main__":
asyncio.run(go())
解决方案最终是 set_config,参数格式为 %s:
cursor.execute("select set_config('iam.identity', %s, false)", (identity,))
我正在尝试监听 engine_connect
事件,将光标放在该连接上,然后发出 SET
查询,但我似乎无法使参数替换起作用。我只有使用 from sqlalchemy.dialects.postgresql.asyncpg import AsyncAdapt_asyncpg_cursor
作为光标的异步引擎有这个问题。
请注意,我可以在 SELECT 查询中使用参数替换,但不能在 SET 查询中使用。
这是错误:
sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: syntax error at or near ""
这里是要重现的代码片段
import asyncio
from sqlalchemy import event
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
db_url = make_url('postgresql://xxx:yyy@127.0.0.1/my_db')
async_db_url = db_url.set(drivername="postgresql+asyncpg")
engine = create_async_engine(
async_db_url,
pool_timeout=5,
pool_pre_ping=True,
pool_recycle=600,
future=True,
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False, future=True,)
def foobar_engine_connect_listener(connection, branch):
identity = "arn:aws:sts::123456:assumed-role/thing_1"
cursor = connection.connection.cursor()
# THIS WORKS
cursor.execute(
"select %s", (identity,)
)
# THIS THROWS THE ERROR
cursor.execute(
"set session iam.identity = %s", (identity,)
)
event.listens_for(engine.sync_engine, "engine_connect")(foobar_engine_connect_listener)
async def go():
async with async_session() as session:
await session.execute('select 1')
if __name__ == "__main__":
asyncio.run(go())
解决方案最终是 set_config,参数格式为 %s:
cursor.execute("select set_config('iam.identity', %s, false)", (identity,))