阻塞的 Python 异步函数调用也会阻塞另一个异步函数
A blocked Python async function invocation also block another async function
我使用 FastAPI 开发访问 SQL 服务器的数据层 API。
无论使用 pytds 还是 pyodbc,
如果有数据库事务导致任何请求挂起,
所有其他请求都将被阻止。 (即使没有数据库操作)
复制:
- 有意做一个可序列化的SQL服务器会话,开始一个事务并且不回滚或提交
INSERT INTO [dbo].[KVStore] VALUES ('1', '1', 0)
begin tran
SET TRANSACTION ISOLATION LEVEL Serializable
SELECT * FROM [dbo].[KVStore]
- 使用异步处理函数向 API 发送请求,如下所示:
def kv_delete_by_key_2_sql():
conn = pytds.connect(dsn='192.168.0.1', database=cfg.kvStore_db, user=cfg.kvStore_uid,
password=cfg.kvStore_upwd, port=1435, autocommit=True)
engine = conn.cursor()
try:
sql = "delete KVStore; commit"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(engine.execute, sql)
rs = future.result()
j = {
'success': True,
'rowcount': rs.rowcount
}
return jsonable_encoder(j)
except Exception as exn:
j = {
'success': False,
'reason': exn_handle(exn)
}
return jsonable_encoder(j)
@app.post("/kvStore/delete")
async def kv_delete(request: Request, type_: Optional[str] = Query(None, max_length=50)):
request_data = await request.json()
return kv_delete_by_key_2_sql()
- 并向具有异步处理函数的同一应用程序的 API 发送请求,如下所示:
async def hangit0(request: Request, t: int = Query(0)):
print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
await asyncio.sleep(t)
print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
j = {
'success': True
}
return jsonable_encoder(j)
@app.get("/kvStore/hangit/")
async def hangit(request: Request, t: int = Query(0)):
return await hangit0(request, t)
我预计 step.2 会挂起,step.3 应该在 2 秒后直接 return。
但是,如果事务未提交或回滚,则第 3 步永远不会 return...
如何让这些处理函数同时工作?
原因是 rs = future.result()
实际上是一个阻塞调用 - 请参阅 python docs。不幸的是,executor.submit()
不是 return 可等待的对象(concurrent.futures.Future
不同于 asyncio.Future
。
您可以使用 asyncio.wrap_future
,它需要 concurrent.futures.Future
和 returns asyncio.Future
(参见 python docs)。新的 Future
对象是可等待的,因此您可以将阻塞函数转换为异步函数。
一个例子:
import asyncio
import concurrent.futures
async def my_async():
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(lambda x: x + 1, 1)
return await asyncio.wrap_future(future)
print(asyncio.run(my_async()))
在您的代码中,只需将 rs = future.result()
更改为 rs = await asyncio.wrap_future(future)
并使整个函数成为 async
。那应该会变魔术,祝你好运! :)
我使用 FastAPI 开发访问 SQL 服务器的数据层 API。 无论使用 pytds 还是 pyodbc, 如果有数据库事务导致任何请求挂起, 所有其他请求都将被阻止。 (即使没有数据库操作)
复制:
- 有意做一个可序列化的SQL服务器会话,开始一个事务并且不回滚或提交
INSERT INTO [dbo].[KVStore] VALUES ('1', '1', 0)
begin tran
SET TRANSACTION ISOLATION LEVEL Serializable
SELECT * FROM [dbo].[KVStore]
- 使用异步处理函数向 API 发送请求,如下所示:
def kv_delete_by_key_2_sql():
conn = pytds.connect(dsn='192.168.0.1', database=cfg.kvStore_db, user=cfg.kvStore_uid,
password=cfg.kvStore_upwd, port=1435, autocommit=True)
engine = conn.cursor()
try:
sql = "delete KVStore; commit"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(engine.execute, sql)
rs = future.result()
j = {
'success': True,
'rowcount': rs.rowcount
}
return jsonable_encoder(j)
except Exception as exn:
j = {
'success': False,
'reason': exn_handle(exn)
}
return jsonable_encoder(j)
@app.post("/kvStore/delete")
async def kv_delete(request: Request, type_: Optional[str] = Query(None, max_length=50)):
request_data = await request.json()
return kv_delete_by_key_2_sql()
- 并向具有异步处理函数的同一应用程序的 API 发送请求,如下所示:
async def hangit0(request: Request, t: int = Query(0)):
print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
await asyncio.sleep(t)
print(t, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
j = {
'success': True
}
return jsonable_encoder(j)
@app.get("/kvStore/hangit/")
async def hangit(request: Request, t: int = Query(0)):
return await hangit0(request, t)
我预计 step.2 会挂起,step.3 应该在 2 秒后直接 return。 但是,如果事务未提交或回滚,则第 3 步永远不会 return...
如何让这些处理函数同时工作?
原因是 rs = future.result()
实际上是一个阻塞调用 - 请参阅 python docs。不幸的是,executor.submit()
不是 return 可等待的对象(concurrent.futures.Future
不同于 asyncio.Future
。
您可以使用 asyncio.wrap_future
,它需要 concurrent.futures.Future
和 returns asyncio.Future
(参见 python docs)。新的 Future
对象是可等待的,因此您可以将阻塞函数转换为异步函数。
一个例子:
import asyncio
import concurrent.futures
async def my_async():
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(lambda x: x + 1, 1)
return await asyncio.wrap_future(future)
print(asyncio.run(my_async()))
在您的代码中,只需将 rs = future.result()
更改为 rs = await asyncio.wrap_future(future)
并使整个函数成为 async
。那应该会变魔术,祝你好运! :)