FastAPI 以串行方式而不是并行方式运行 api-调用

FastAPI runs api-calls in serial instead of parallel fashion

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}

如果我 运行 我在本地主机上的代码 - 例如 http://localhost:8501/ping - 在同一浏览器的不同选项卡 window 中,我得到:

Hello
bye
Hello
bye

而不是:

Hello
Hello
bye
bye

我已经阅读了有关使用 httpx 的信息,但我仍然无法实现真正​​的并行化。有什么问题吗?

Q :
" ... What's the problem? "

A :
FastAPI 文档明确表示框架使用 in-process 任务(继承自 Starlette).

就其本身而言,这意味着所有此类任务都会竞争(不时)接收 Python 解释器 GIL-lock - 有效地成为 MUTEX-terrorising 全局解释器锁,这实际上重新 [SERIAL]-ises 任何和所有数量的 Python 解释器 in-process 线程
以作为 one-and-only-one-WORKS-while-all-others-stay-waiting...

在 fine-grain 规模上,您会看到结果——如果第二次生成另一个处理程序(从第二个 FireFox-tab 手动启动)到达 http-request 实际上需要比睡眠更长的时间GIL-lock 交错的结果 ~ 100 [ms] time-quanta round-robin ( all-wait-one-can-work ~ 100 [ms] 在每个下一轮 GIL-lock release-acquire-roulette 发生) Python 解释器内部工作没有显示更多细节,您可以使用来自 here to see more in-thread LoD 的更多细节(取决于 O/S 类型或版本),就像 async-decorated 正在执行的代码:

import time
import threading
from   fastapi import FastAPI, Request

TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"

print( TEMPLATE.format( time.perf_counter_ns(),
                        threading.get_ident(),
                       "Python Interpreter __main__ was started ..."
                        )
...
@app.get("/ping")
async def ping( request: Request ):
        """                                __doc__
        [DOC-ME]
        ping( Request ):  a mock-up AS-IS function to yield
                          a CLI/GUI self-evidence of the order-of-execution
        RETURNS:          a JSON-alike decorated dict

        [TEST-ME]         ...
        """
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "Hello..."
                                )
        #------------------------------------------------- actual blocking work
        time.sleep( 5 )
        #------------------------------------------------- actual blocking work
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "...bye"
                                )
        return { "ping": "pong!" }

最后但并非最不重要的一点是,请不要犹豫,阅读更多关于所有 other sharks threads-based 代码可能会遭受...甚至导致...在幕后...

广告备忘录

GIL-lock、thread-based 池、异步装饰器、阻塞和 event-handling 的混合体——肯定是不确定性和 HWY2HELL 的混合体;o)

根据FastAPI's documentation

When you declare a path operation function with normal def instead of async def, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).

因此,def(同步)路由 运行 来自线程池的单独线程,或者换句话说,服务器处理请求 并发 ,而 async def 在主(单)线程上路由 运行,即服务器按顺序 处理请求 - 只要没有 call to I/O-bound operations inside such routes, such as waiting for data from the client to be sent through the network, contents of a file in the disk to be read, a database operation to finish, etc. - have a look here. Asynchronous code with async and await is many times summarised as using coroutines. Coroutines are collaborative (or cooperatively multitasked): "at any given time, a program with coroutines is running only one of its coroutines, and this running coroutine suspends its execution only when it explicitly requests to be suspended" (see here and here for more info on coroutines). However, this does not apply to CPU-bound operations, such as the ones described here. CPU-bound 操作,即使在 async def 函数中声明并使用 await 调用,也会阻塞主线程。这也意味着 async def 路由中的阻塞操作,例如 time.sleep() 将阻塞整个服务器(如您的情况)。

因此,如果您的函数不打算进行任何 async 调用,您可以使用 def 声明它,如下所示:

@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

否则,如果您要调用必须 awaitasync 函数,则应使用 async def。为了证明这一点,下面也使用了 asyncio library. Similar example is given here and 中的 asyncio.sleep() 函数。

import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

如果两个请求几乎同时到达,上述两个函数都将打印预期的输出 - 如您的问题中所述。

Hello
Hello
bye
bye

注意:当您第二次(第三次,依此类推)调用您的端点时,请记住从与浏览器主会话隔离的选项卡进行操作;否则,请求将显示为来自同一客户端(您可以使用 print(request.client) 检查 - 如果两个选项卡在同一个 window),因此,请求将按顺序处理。您可以重新加载相同的选项卡(就像 运行ning 一样),或者在隐身模式下打开一个新选项卡 window,或者使用另一个 browser/client 发送请求。

Async/await 和昂贵的 CPU-bound 操作(长计算任务)

如果您需要使用 async def(因为您可能需要 await 用于路由中的协同程序),但也有一些同步的长计算任务可能会阻塞服务器并且不会'让其他请求通过,例如:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = await file.read()
        res = some_long_computation_task(contents)  # this blocks other requests
    finally:
        await file.close()
    print("bye")
    return "pong"

然后:

  1. 使用更多 workers (e.g., uvicorn main:app --workers 4). Note: Each worker "has its own things, variables and memory". This means that global variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and 。此外,“如果您在代码中消耗大量内存,每个进程 将消耗等量的内存”.

  2. 使用 concurrency 模块中的 FastAPI (Starlette's) run_in_threadpool()(源代码 here and here) - as @tiangolo suggested here - which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here,“run_in_threadpool 是一个等待函数,第一个参数是一个普通的函数,下一个参数直接传递给那个函数。它支持序列参数和关键字参数。

    from fastapi.concurrency import run_in_threadpool
    res = await run_in_threadpool(some_long_computation_task, contents)
    
  3. 或者,使用 asynciorun_in_executor:

    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, lambda: some_long_computation_task(contents))
    
  4. 您还应该检查是否可以将路由定义更改为 def。例如,如果端点中唯一需要等待的方法是读取文件内容的方法(正如您在下面的评论部分中提到的),FastAPI 可以为您读取文件的 bytes(但是,这应该适用于小文件,因为整个内容将存储在内存中,请直接查看 here), or you could even call the read() method of the SpooledTemporaryFile 对象,这样您就不必等待 read() 方法 - 因为您现在可以声明您的使用 def 路由,每个请求将 运行 在单独的线程中。

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
        print("Hello")
        try:
            contents = file.file.read()
            res = some_long_computation_task(contents)
        finally:
            file.file.close()
        print("bye")
        return "pong"
    
  5. 查看 , as well as the documentation here,了解更多建议的解决方案。