如何避免使用循环函数阻塞异步事件循环

How to avoid blocking the asyncio event loop with looping functions

我正在使用带有 WebSockets 的 FastAPI 将 SVG“推送”到客户端。 问题是:如果迭代 运行 连续,它们会阻塞异步事件循环,因此套接字无法收听其他消息。

运行循环作为后台任务不合适,因为每次迭代CPU繁重,数据必须返回给客户端。

是否有不同的方法,或者我是否需要从客户端触发每个步骤?我认为 multiprocessing 可以工作,但不确定这如何与 await websocket.send_text().

这样的异步代码一起工作
@app.websocket("/ws")
async def read_websocket(websocket: WebSocket) -> None:
    await websocket.accept()
    while True:
        data = await websocket.receive_text()

        async def run_continuous_iterations():
            #needed to run the steps until the user sends "stop"
            while True:
                svg_string = get_step_data()
                await websocket.send_text(svg_string) 

        if data == "status":
            await run_continuous_iterations()
        #this code can't run if the event loop is blocked by run_continuous_iterations
        if data == "stop":
            is_running = False
            print("Stopping process")

"...each iteration is CPU heavy and the data must be returned to the client".

, a "coroutine suspends its execution only when it explicitly requests to be suspended", e.g., if there is an call to I/O-bound operations, such as the ones described here. This, however, does not apply to CPU-bound operations, such as the ones mentioned here所述。因此,CPU-bound 操作,即使它们在 async def 函数中声明并使用 await 调用,也会阻塞事件循环;因此,任何其他请求都将被阻止。

此外,从您提供的代码片段来看,您似乎希望将数据发送回客户端,同时收听新消息(以检查客户端是否发送了“停止" 停止进程的消息)。因此,await 等待操作完成不是正确的方法,而是启动 thread/process 来执行该任务。解决方案如下。

使用asynciorun_in_executor:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    is_running = True
    await websocket.accept()
    
    try:
        while True:
            data = await websocket.receive_text()

            async def run_continuous_iterations():
                while is_running:
                    svg_string = get_step_data()
                    await websocket.send_text(svg_string)
                
            if data == "status":
                is_running = True
                loop = asyncio.get_running_loop()
                loop.run_in_executor(None, lambda: asyncio.run(run_continuous_iterations()))

            if data == "stop":
                is_running = False
                print("Stopping process")
                
    except WebSocketDisconnect:
        is_running = False
        print("Client disconnected")  

使用threadingThread:

#...  rest of the code is the same as above
                
if data == "status":
    is_running = True
    thread = threading.Thread(target=lambda: asyncio.run(run_continuous_iterations()))
    thread.start()

#...  rest of the code is the same as above