服务于 ML 模型的 FastAPI 应用程序有阻塞代码?

FastAPI application serving a ML model has blocking code?

我们有一个使用 Flask 提供的 ML 模型。使用 Gatling (https://gatling.io/) 对 Flask 应用程序进行负载测试导致性能非常低。它每秒无法处理大量请求。因此我们已经转向 FastAPI。

在带有 uvicorn 或 gunicorn 的 Docker 容器中本地提供它效果很好。但是我们注意到应用程序在几分钟内没有响应:Gatling Load Test - Local Docker Container

在此图像中,您可以看到应用程序以“批处理”方式响应。在 Kubernetes 集群中提供我们的应用程序会导致容器重启,因为负责的容器不会成功 readiness/liveness 探测。

我们已经在 uvicorn's git 上提出了这个问题。但是,我认为我们不会在那里得到答案。我们认为可能是我们编写的代码阻塞了主线程,因此我们的 FastAPI 应用程序在几分钟内不会响应。

应用端点的片段:

async def verify_client(token: str):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM], audience=AUDIENCE)
    except JWTError:
        raise credentials_exception


@app.post("/score", response_model=cluster_api_models.Response_Model)
async def score(request: cluster_api_models.Request_Model, token: str = Depends(oauth2_scheme)):
    logger.info("Token: {0}".format(token))
    await verify_client(token)
    result = await do_score(request)
    return result

await do_score(request)有所有的预处理和预测代码。它使用 gensim fasttext 模型创建文档向量和 scikit-learn K-Means 模型。 do_score() 定义为 async def do_score(request)。从 documentation of FastAPI 开始,我们认为这足以使我们的应用程序异步。然而它看起来不像。它仍在按顺序处理它,而且它在几分钟内没有响应。该方法还包括一个嵌套的 for 循环 O(n²)...不确定这是否也会导致阻塞代码。

我希望所提供的信息足以让您入门。如果您需要有关代码的更多信息,请告诉我。然后我需要更改代码的一些变量名。非常感谢您!

当然,如果您的应用程序不是完全异步的,某些东西会阻止您的应用程序,async 在这里只是一个奇特的关键字。

即使您使用 async def 定义了一个函数,如果它在下面做了一些阻塞,它也会阻塞您应用程序的整个执行。你不相信吗?测试一下。

@app.get("/dummy")
async def dummy():
    time.sleep(5)

让我们向它发送 3 个并发请求。

for _ in {1..3}; do curl http://127.0.0.1:8000/dummy &; done

这将花费 +15 秒。

让我们深入探讨一下,我说过 async def 只是一种声明协程的奇特语法,为什么?参见 PEP 492

async def functions are always coroutines, even if they do not contain await expressions.

为什么重要?

当你定义一个协程时,使用 await 语法,你说你的事件循环 继续 ,好吧,它这样做了,它切换到另一个协程并且运行它。

有什么区别?

基本上,协同程序不会等待结果,它会一直运行。但是当你定义一个普通函数时,它当然会等待那个函数的执行。

既然我们都知道它会阻塞,你能做什么?

您可能想使用像 Celery 这样的 Job/Task 队列库。

这里的正确答案是使用非异步路由定义。 FastAPI 文档指出,任何包含同步代码的路由都应放入这些类型的路由中,以便它可以将您的代码放入其自己的内部线程池中,从而创建一个伪异步路由。

@app.post("/score", response_model=cluster_api_models.Response_Model)
def score(request: cluster_api_models.Request_Model, token: str = Depends(oauth2_scheme)):
    logger.info("Token: {0}".format(token))
    verify_client(token)
    result = do_score(request)
    return result

这是我引用的来自 fastapi 的文档的link。

https://fastapi.tiangolo.com/async/#path-operation-functions