在 Python Gino 中获取池连接(异步)

Acquiring pool connections in Python Gino (async)

我正在使用 Postgres,Python3.7 和 asyncio + asyncpg + gino(ORM-ish)+ aiohttp(路由,网络响应)。

我在我的数据库 testdb 中创建了一个小的 postgres table users 并插入了一行:

testdb=# select * from users;
 id | nickname
----+----------
  1 | fantix

我正在尝试设置我的数据库,以便在收到请求时可以在路由中使用 ORM。

import time
import asyncio
import gino

DATABASE_URL = os.environ.get('DATABASE_URL')

db = gino.Gino()

class User(db.Model):
  __tablename__ = 'users'
  id = db.Column(db.Integer(), primary_key=True)
  nickname = db.Column(db.Unicode(), default='noname')

kwargs = dict(
  min_size=10,
  max_size=100,
  max_queries=1000,
  max_inactive_connection_lifetime=60 * 5,
  echo=True
)

async def test_engine_implicit():
  await db.set_bind(DATABASE_URL, **kwargs)
  return await User.query.gino.all()      # this works

async def test_engine_explicit():
  engine = await gino.create_engine(DATABASE_URL, **kwargs)
  db.bind = engine
  async with engine.acquire() as conn:
    return await conn.all(User.select())  # this doesn't work!

users = asyncio.get_event_loop().run_until_complete(test_engine_implicit())
print(f'implicit query: {users}')

users = asyncio.get_event_loop().run_until_complete(test_engine_explicit())
print(f'explicit query: {users}')

输出为:

web_1    | INFO gino.engine._SAEngine SELECT users.id, users.nickname FROM users
web_1    | INFO gino.engine._SAEngine ()
web_1    | implicit query: [<db.User object at 0x7fc57be42410>]
web_1    | INFO gino.engine._SAEngine SELECT
web_1    | INFO gino.engine._SAEngine ()
web_1    | explicit query: [()]

这很奇怪。 "explicit" 代码本质上是针对数据库运行裸 SELECT,这是无用的。

我在文档中找不到同时执行 1) 使用 ORM 和 2) 显式检查池中连接的方法。

我的问题:

  1. await User.query.gino.all() 是否检查池中的连接?它是如何发布的?
  2. 我如何将查询包装在事务中?我很不安,因为我无法明确控制何时/何地从池中获取连接,以及如何释放它。

我基本上喜欢 test_engine_explicit() 中风格的明确性以与 Gino 一起工作,但也许我只是不理解 Gino ORM 是如何工作的。

我以前从未使用过 GINO,但快速查看代码后:

  1. GINO 连接简单executes provided clause as is。因此,如果您只提供 User.select(),那么它不会增加任何内容。
  2. 如果你想达到与使用 User.query.gino.all() 相同的效果,但要自己保持连接,那么你可以 follow the docs 并使用 User.query 而不是普通的 User.select():
async with engine.acquire() as conn:
    return await conn.all(User.query) 

刚刚测试过,对我来说效果很好。

关于connections pool,我不确定自己是否答对了,但是Engine.acquire creates a reusable connection by default然后加入到池中,其实就是一个stack:

   :param reusable: Mark this connection as reusable or otherwise. This
      has no effect if it is a reusing connection. All reusable connections
      are placed in a stack, any reusing acquire operation will always
      reuse the top (latest) reusable connection. One reusable connection
      may be reused by several reusing connections - they all share one
      same underlying connection. Acquiring a connection with
      ``reusable=False`` and ``reusing=False`` makes it a cleanly isolated
      connection which is only referenced once here.

在 GINO 中也有一个 manual transaction control,例如您可以创建一个不可重用、不可重用的连接并手动控制事务流:

async with engine.acquire(reuse=False, reusable=False) as conn: 
     tx = await conn.transaction() 
     try: 
          await conn.status("INSERT INTO users(nickname) VALUES('e')") 
          await tx.commit() 
     except Exception: 
          await tx.rollback() 
          raise

至于释放连接,我找不到任何GINO自己释放连接的证据。我猜那个池是由 SQLAlchemy 核心维护的。

我肯定没有直接回答你的问题,但希望它能以某种方式帮助你。