阻塞的 Python 异步函数调用也会阻塞另一个异步函数

A blocked Python async function invocation also block another async function

我使用 FastAPI 开发访问 SQL 服务器的数据层 API。 无论使用 pytds 还是 pyodbc, 如果有数据库事务导致任何请求挂起, 所有其他请求都将被阻止。 (即使没有数据库操作)

复制:

  1. 有意做一个可序列化的SQL服务器会话,开始一个事务并且不回滚或提交
    INSERT INTO [dbo].[KVStore] VALUES ('1', '1', 0)
    begin tran
    SET TRANSACTION ISOLATION LEVEL Serializable
    SELECT * FROM [dbo].[KVStore]
  1. 使用异步处理函数向 API 发送请求,如下所示:
    def kv_delete_by_key_2_sql():
            conn = pytds.connect(dsn='192.168.0.1', database=cfg.kvStore_db, user=cfg.kvStore_uid,
                                 password=cfg.kvStore_upwd, port=1435, autocommit=True)
            engine = conn.cursor()
            try:
                sql = "delete KVStore; commit"

                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(engine.execute, sql)
                    rs = future.result()
                    j = {
                        'success': True,
                        'rowcount': rs.rowcount
                    }
                    return jsonable_encoder(j)
            except Exception as exn:
                j = {
                    'success': False,
                    'reason': exn_handle(exn)
                }
                return jsonable_encoder(j)

    @app.post("/kvStore/delete")
    async def kv_delete(request: Request, type_: Optional[str] = Query(None, max_length=50)):
        request_data = await request.json()

        return kv_delete_by_key_2_sql()

  1. 并向具有异步处理函数的同一应用程序的 API 发送请求,如下所示:

    async def hangit0(request: Request, t: int = Query(0)):
        print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
        await asyncio.sleep(t)
        print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
        j = {
            'success': True
        }
        return jsonable_encoder(j)
    
    @app.get("/kvStore/hangit/")
    async def hangit(request: Request, t: int = Query(0)):
        return await hangit0(request, t)

我预计 step.2 会挂起,step.3 应该在 2 秒后直接 return。 但是,如果事务未提交或回滚,则第 3 步永远不会 return...

如何让这些处理函数同时工作?

原因是 rs = future.result() 实际上是一个阻塞调用 - 请参阅 python docs。不幸的是,executor.submit() 不是 return 可等待的对象(concurrent.futures.Future 不同于 asyncio.Future

您可以使用 asyncio.wrap_future,它需要 concurrent.futures.Future 和 returns asyncio.Future(参见 python docs)。新的 Future 对象是可等待的,因此您可以将阻塞函数转换为异步函数。

一个例子:

import asyncio
import concurrent.futures

async def my_async():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future = executor.submit(lambda x: x + 1, 1)
        return await asyncio.wrap_future(future)

print(asyncio.run(my_async()))

在您的代码中,只需将 rs = future.result() 更改为 rs = await asyncio.wrap_future(future) 并使整个函数成为 async。那应该会变魔术,祝你好运! :)