如何使用 asyncio 和 postgres 在 python 中进行交易?

How to do transactions in python with asyncio and postgres?

我的RPC方法中有两个操作:

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("SELECT ... FROM MyTable");
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATA MyTable ...");

另一种 RPC 方法可以在操作 "my_rpc" 完成之前更改数据库(在两次等待 SQL 查询之间)。如何避免这种情况?

self.Engine 的代码(使用引擎 aiopg.sa.create_engine 调用):

class ConnectionContextManager(object):
    def __init__(self, engine):
        self.conn = None
        self.engine = engine

    async def __aenter__(self):
        if self.engine:
            self.conn = await self.engine.acquire()
            return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        try:
            self.engine.release(self.conn)
            self.conn.close()
        finally:
            self.conn = None
            self.engine = None

看来避免混淆的唯一方法是让每个事务都发生在单独的数据库连接中(Python 侧游标不行) 这样做的方法是拥有一个连接池 - 并让您的引擎方法为每个 "async thread".

提供不同的连接

如果 Postgresql 本身的连接器是异步感知的(顺便说一句,您使用的是哪个驱动程序?),那会更容易。或者它上面的数据库包装层。如果不是,您将不得不自己实现这个连接池。我认为 Sqlalchemy 连接池将在这种情况下正常工作,因为独立于在协同例程中使用,连接只会在 async with 块的末尾被释放。

首先,aiopg工作在自动提交模式,这意味着你必须在手动模式下使用事务。 Read more details.

其次,您必须对您在第一条语句中读取的锁定行使用 SELECT FOR UPDATE。 SELECT FOR UPDATE 锁定 select 行,直到事务完成。 Read more details.

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("BEGIN")
        await conn.execute("SELECT ... FROM MyTable WHERE some_clause = some_value FOR UPDATE")
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATE MyTable SET some_clause=...")
        await conn.execute("""COMMIT""")