如何在 FastAPI 中进行多处理

How to do multiprocessing in FastAPI

在处理 FastAPI 请求时,我有一个 CPU 绑定任务要对列表的每个元素执行。我想在多个 CPU 核心上进行此处理。

在 FastAPI 中执行此操作的正确方法是什么?我可以使用标准 multiprocessing 模块吗?到目前为止,我发现的所有 tutorials/questions 仅涵盖 I/O-bound 网络请求等任务。

async def端点

您可以使用 loop.run_in_executor with ProcessPoolExecutor 在单独的进程中启动函数。

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def端点

因为 def 端点是 run implicitly in a separate thread, you can use the full power of modules multiprocessing and concurrent.futures。请注意,在 def 函数内部,可能无法使用 await。样品:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

注意应该记住,在端点中创建进程池以及创建大量线程可能会导致随着请求数量的增加,响应变慢。


即时执行

在单独的进程中执行函数并立即等待结果的最简单和最原生的方法是使用 loop.run_in_executor with ProcessPoolExecutor.

一个池,如下例所示,可以在应用程序启动时创建,不要忘记在应用程序退出时关闭。可以使用 max_workers ProcessPoolExecutor 构造函数参数设置池中使用的进程数。如果 max_workersNone 或未给出,它将默认为机器上的处理器数。

这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态。而且如果由于某种原因连接丢失,那么结果将无处可去 return.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

移至后台

通常,CPU绑定任务在后台执行。 FastAPI 提供了 运行 background tasks 成为 运行 after return 响应的能力,你可以在其中启动和异步等待对于您的 CPU 绑定任务的结果。

在这种情况下,例如,您可以立即 return 一个 "Accepted" 的响应(HTTP 代码 202)和一个独特的任务 ID,在后台继续计算,并且客户端稍后可以使用此 ID.

请求任务状态

BackgroundTasks 提供一些功能,特别是,您可以 运行 其中的几个(包括依赖项)。在它们中,您可以使用依赖项中获得的资源,这些资源只有在所有任务完成后才会被清理,而在出现异常的情况下,可以正确处理它们。这在diagram.

中可以看得更清楚

下面是一个执行最小任务跟踪的示例。假定应用程序的一个实例 运行ning。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

更强大的解决方案

上面所有的例子都非常简单,但是如果你需要一些更强大的系统来进行繁重的分布式计算,那么你可以看看消息代理RabbitMQKafkaNATS 等。以及像 Celery 这样使用它们的库。