需要立即使用 FastAPI 发送响应状态代码,同时在后台同步保持作业

Need to send response status code right away with FastAPI while keeping jobs synchronously in the background

我有一个非常耗时的任务(图像处理),它从提供给 FastAPI 端点的请求中接收一些输入数据。为了保持呼叫者响应,我需要发送一条即时响应消息,如 "ok" 以及 201 状态代码(后者可选)。

到目前为止我一直在使用这个:

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def main_process(parameters)
...some long task

@app.post('/task')
async def do_task(reference_id: int,
              bucket: str,
              document_url: str,
              return_url: str,
              background_tasks: BackgroundTasks):

    background_tasks.add_task(main_process, bucket, document_url, reference_id, return_url)
    return 'ok'

每个 main_process 任务从 S3 中的存储桶下载图像,然后进行一些处理。上面显示的解决方案工作正常,直到它达到大约 10 个异步处理的图像(给定异步定义)然后崩溃。

我也试过增加一些 gunicorn 参数,比如 max-requests 到 100,像这样:

gunicorn api:app -b 0.0.0.0:8000 -w 4 -k uvicorn.workers.UvicornWorker --preload --max-requests 100 --daemon

这给了我更多的处理空间(多了 20 张图像),但它还是崩溃了。

我也考虑过使用 Celery 或一些分布式任务队列解决方案,但我希望尽可能简单。

由于异步行为并不重要,但即时响应才是关键,是否可以切换到同步解决方案但立即获得 "ok" 响应?

不,您必须真正分派任务并将其委托给某些处理后端。这样的后端可以非常简单,例如只是一个任务队列(celery/amqp、redis、关系数据库,任何适合您需要的)和至少一个进程使用该队列,执行计算并将结果反馈回存储。

当您从 API 发送请求时,同时生成一个 UUID 并将其与您的计算作业一起存储在队列中。当您将快速 200 OK 反馈给呼叫者时,还要向他们提供他们工作的 UUID(如果需要)。他们会再次点击您的 API 查询结果;让他们提供 UUID 并使用它在您的存储后端中查找结果。

为避免两次计算相同的请求,从请求中生成一个散列并使用它代替 UUID(注意冲突,您需要一些更长的散列)。只要您不必处理 user/image 权限,这很容易实现。