需要立即使用 FastAPI 发送响应状态代码,同时在后台同步保持作业
Need to send response status code right away with FastAPI while keeping jobs synchronously in the background
我有一个非常耗时的任务(图像处理),它从提供给 FastAPI 端点的请求中接收一些输入数据。为了保持呼叫者响应,我需要发送一条即时响应消息,如 "ok" 以及 201 状态代码(后者可选)。
到目前为止我一直在使用这个:
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def main_process(parameters)
...some long task
@app.post('/task')
async def do_task(reference_id: int,
bucket: str,
document_url: str,
return_url: str,
background_tasks: BackgroundTasks):
background_tasks.add_task(main_process, bucket, document_url, reference_id, return_url)
return 'ok'
每个 main_process
任务从 S3 中的存储桶下载图像,然后进行一些处理。上面显示的解决方案工作正常,直到它达到大约 10 个异步处理的图像(给定异步定义)然后崩溃。
我也试过增加一些 gunicorn 参数,比如 max-requests
到 100,像这样:
gunicorn api:app -b 0.0.0.0:8000 -w 4 -k uvicorn.workers.UvicornWorker --preload --max-requests 100 --daemon
这给了我更多的处理空间(多了 20 张图像),但它还是崩溃了。
我也考虑过使用 Celery 或一些分布式任务队列解决方案,但我希望尽可能简单。
由于异步行为并不重要,但即时响应才是关键,是否可以切换到同步解决方案但立即获得 "ok" 响应?
不,您必须真正分派任务并将其委托给某些处理后端。这样的后端可以非常简单,例如只是一个任务队列(celery/amqp、redis、关系数据库,任何适合您需要的)和至少一个进程使用该队列,执行计算并将结果反馈回存储。
当您从 API 发送请求时,同时生成一个 UUID 并将其与您的计算作业一起存储在队列中。当您将快速 200 OK 反馈给呼叫者时,还要向他们提供他们工作的 UUID(如果需要)。他们会再次点击您的 API 查询结果;让他们提供 UUID 并使用它在您的存储后端中查找结果。
为避免两次计算相同的请求,从请求中生成一个散列并使用它代替 UUID(注意冲突,您需要一些更长的散列)。只要您不必处理 user/image 权限,这很容易实现。
我有一个非常耗时的任务(图像处理),它从提供给 FastAPI 端点的请求中接收一些输入数据。为了保持呼叫者响应,我需要发送一条即时响应消息,如 "ok" 以及 201 状态代码(后者可选)。
到目前为止我一直在使用这个:
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def main_process(parameters)
...some long task
@app.post('/task')
async def do_task(reference_id: int,
bucket: str,
document_url: str,
return_url: str,
background_tasks: BackgroundTasks):
background_tasks.add_task(main_process, bucket, document_url, reference_id, return_url)
return 'ok'
每个 main_process
任务从 S3 中的存储桶下载图像,然后进行一些处理。上面显示的解决方案工作正常,直到它达到大约 10 个异步处理的图像(给定异步定义)然后崩溃。
我也试过增加一些 gunicorn 参数,比如 max-requests
到 100,像这样:
gunicorn api:app -b 0.0.0.0:8000 -w 4 -k uvicorn.workers.UvicornWorker --preload --max-requests 100 --daemon
这给了我更多的处理空间(多了 20 张图像),但它还是崩溃了。
我也考虑过使用 Celery 或一些分布式任务队列解决方案,但我希望尽可能简单。
由于异步行为并不重要,但即时响应才是关键,是否可以切换到同步解决方案但立即获得 "ok" 响应?
不,您必须真正分派任务并将其委托给某些处理后端。这样的后端可以非常简单,例如只是一个任务队列(celery/amqp、redis、关系数据库,任何适合您需要的)和至少一个进程使用该队列,执行计算并将结果反馈回存储。
当您从 API 发送请求时,同时生成一个 UUID 并将其与您的计算作业一起存储在队列中。当您将快速 200 OK 反馈给呼叫者时,还要向他们提供他们工作的 UUID(如果需要)。他们会再次点击您的 API 查询结果;让他们提供 UUID 并使用它在您的存储后端中查找结果。
为避免两次计算相同的请求,从请求中生成一个散列并使用它代替 UUID(注意冲突,您需要一些更长的散列)。只要您不必处理 user/image 权限,这很容易实现。