如何避免使用循环函数阻塞异步事件循环
How to avoid blocking the asyncio event loop with looping functions
我正在使用带有 WebSockets 的 FastAPI 将 SVG“推送”到客户端。 问题是:如果迭代 运行 连续,它们会阻塞异步事件循环,因此套接字无法收听其他消息。
运行循环作为后台任务不合适,因为每次迭代CPU繁重,数据必须返回给客户端。
是否有不同的方法,或者我是否需要从客户端触发每个步骤?我认为 multiprocessing 可以工作,但不确定这如何与 await websocket.send_text()
.
这样的异步代码一起工作
@app.websocket("/ws")
async def read_websocket(websocket: WebSocket) -> None:
await websocket.accept()
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
#needed to run the steps until the user sends "stop"
while True:
svg_string = get_step_data()
await websocket.send_text(svg_string)
if data == "status":
await run_continuous_iterations()
#this code can't run if the event loop is blocked by run_continuous_iterations
if data == "stop":
is_running = False
print("Stopping process")
"...each iteration is CPU heavy and the data must be returned to the
client".
如, a "coroutine suspends its execution only when it explicitly requests to be suspended", e.g., if there is an call to I/O-bound
operations, such as the ones described here. This, however, does not apply to CPU-bound
operations, such as the ones mentioned here所述。因此,CPU-bound
操作,即使它们在 async def
函数中声明并使用 await
调用,也会阻塞事件循环;因此,任何其他请求都将被阻止。
此外,从您提供的代码片段来看,您似乎希望将数据发送回客户端,同时收听新消息(以检查客户端是否发送了“停止" 停止进程的消息)。因此,await
等待操作完成不是正确的方法,而是启动 thread/process 来执行该任务。解决方案如下。
使用asyncio
的run_in_executor
:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
is_running = True
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
while is_running:
svg_string = get_step_data()
await websocket.send_text(svg_string)
if data == "status":
is_running = True
loop = asyncio.get_running_loop()
loop.run_in_executor(None, lambda: asyncio.run(run_continuous_iterations()))
if data == "stop":
is_running = False
print("Stopping process")
except WebSocketDisconnect:
is_running = False
print("Client disconnected")
使用threading
的Thread
:
#... rest of the code is the same as above
if data == "status":
is_running = True
thread = threading.Thread(target=lambda: asyncio.run(run_continuous_iterations()))
thread.start()
#... rest of the code is the same as above
我正在使用带有 WebSockets 的 FastAPI 将 SVG“推送”到客户端。 问题是:如果迭代 运行 连续,它们会阻塞异步事件循环,因此套接字无法收听其他消息。
运行循环作为后台任务不合适,因为每次迭代CPU繁重,数据必须返回给客户端。
是否有不同的方法,或者我是否需要从客户端触发每个步骤?我认为 multiprocessing 可以工作,但不确定这如何与 await websocket.send_text()
.
@app.websocket("/ws")
async def read_websocket(websocket: WebSocket) -> None:
await websocket.accept()
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
#needed to run the steps until the user sends "stop"
while True:
svg_string = get_step_data()
await websocket.send_text(svg_string)
if data == "status":
await run_continuous_iterations()
#this code can't run if the event loop is blocked by run_continuous_iterations
if data == "stop":
is_running = False
print("Stopping process")
"...each iteration is CPU heavy and the data must be returned to the client".
如I/O-bound
operations, such as the ones described here. This, however, does not apply to CPU-bound
operations, such as the ones mentioned here所述。因此,CPU-bound
操作,即使它们在 async def
函数中声明并使用 await
调用,也会阻塞事件循环;因此,任何其他请求都将被阻止。
此外,从您提供的代码片段来看,您似乎希望将数据发送回客户端,同时收听新消息(以检查客户端是否发送了“停止" 停止进程的消息)。因此,await
等待操作完成不是正确的方法,而是启动 thread/process 来执行该任务。解决方案如下。
使用asyncio
的run_in_executor
:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
is_running = True
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
async def run_continuous_iterations():
while is_running:
svg_string = get_step_data()
await websocket.send_text(svg_string)
if data == "status":
is_running = True
loop = asyncio.get_running_loop()
loop.run_in_executor(None, lambda: asyncio.run(run_continuous_iterations()))
if data == "stop":
is_running = False
print("Stopping process")
except WebSocketDisconnect:
is_running = False
print("Client disconnected")
使用threading
的Thread
:
#... rest of the code is the same as above
if data == "status":
is_running = True
thread = threading.Thread(target=lambda: asyncio.run(run_continuous_iterations()))
thread.start()
#... rest of the code is the same as above