如何 运行 并行后台任务 (python, FastAPI)
How to run background task in parallel (python, FastAPI)
我目前正在开发具有多个端点的 API。其中之一是在数据库中注册数据,其他端点与简单的 CRUD 端点相关(通过 id 获取数据、获取所有数据、删除数据等)。
调用注册数据端点时,响应几乎立即返回给 API,然后后台任务启动,我们在其中获取数据,必要时解压缩等。
我们为此使用快速API 和异步函数。不过,我注意到 API 被后台任务的执行阻止了。当我一次将一个大文件上传到 S3 时(不是分块,为此我使用异步函数),这尤其糟糕,我必须等待完整文件的上传结束,然后另一个请求才能得到响应(就像请求 获取所有数据 端点时))。
我是并行和并发方面的专家,但我期待的是后台任务而不是阻塞 API。
关于如何 运行 这个漫长的 运行ning 后台任务不会阻止对 API 的新请求的任何想法?芹菜最适合这个吗?
模拟示例:
@app.post("register endpoint")
async def register_data(datainput, backgroundtasks: BackgroundTasks):
#Do something
backgroundtasks.add_task(background_operation)
return JSONResponse(
status_code=200, content="Doing stuff")
)
async def background_operation():
#Doing stuff here
await function_that_uploads_data_to_s3
更新:对我有用的是重写我的后台函数(以及因此,我的大部分代码)而不是异步(async def 到 def)。这允许后台任务 运行 在一个单独的线程中,同时允许 API 仍然响应。不确定这是否是最佳选择,但这是目前唯一可行的方法。将来我们可能会考虑为此使用 celery 或将负责 API 调用的服务与负责实际长 运行ning 后台操作的另一个服务分开。
我们有一个类似的问题,重要的区别是我们的是 CPU 绑定计算,不像 OP 的文件上传,这是 IO 绑定。我们使用 Flask 而不是 FastAPI,尽管该解决方案对任何一个都适用。
我最终为每个请求生成了一个进程(在一些基本验证之后),就像这样:
@app.route('/start-job', methods=['POST'])
def start_job():
validate_request()
t2 = Process(target=self.compute,
args=(
a,b,c,d))
t2.start()
return "Job Started Successfully"
可能不适合生产级应用程序,但您确实可以用最少的努力实现 CPU 并行化。
我目前正在开发具有多个端点的 API。其中之一是在数据库中注册数据,其他端点与简单的 CRUD 端点相关(通过 id 获取数据、获取所有数据、删除数据等)。
调用注册数据端点时,响应几乎立即返回给 API,然后后台任务启动,我们在其中获取数据,必要时解压缩等。
我们为此使用快速API 和异步函数。不过,我注意到 API 被后台任务的执行阻止了。当我一次将一个大文件上传到 S3 时(不是分块,为此我使用异步函数),这尤其糟糕,我必须等待完整文件的上传结束,然后另一个请求才能得到响应(就像请求 获取所有数据 端点时))。
我是并行和并发方面的专家,但我期待的是后台任务而不是阻塞 API。
关于如何 运行 这个漫长的 运行ning 后台任务不会阻止对 API 的新请求的任何想法?芹菜最适合这个吗?
模拟示例:
@app.post("register endpoint")
async def register_data(datainput, backgroundtasks: BackgroundTasks):
#Do something
backgroundtasks.add_task(background_operation)
return JSONResponse(
status_code=200, content="Doing stuff")
)
async def background_operation():
#Doing stuff here
await function_that_uploads_data_to_s3
更新:对我有用的是重写我的后台函数(以及因此,我的大部分代码)而不是异步(async def 到 def)。这允许后台任务 运行 在一个单独的线程中,同时允许 API 仍然响应。不确定这是否是最佳选择,但这是目前唯一可行的方法。将来我们可能会考虑为此使用 celery 或将负责 API 调用的服务与负责实际长 运行ning 后台操作的另一个服务分开。
我们有一个类似的问题,重要的区别是我们的是 CPU 绑定计算,不像 OP 的文件上传,这是 IO 绑定。我们使用 Flask 而不是 FastAPI,尽管该解决方案对任何一个都适用。
我最终为每个请求生成了一个进程(在一些基本验证之后),就像这样:
@app.route('/start-job', methods=['POST'])
def start_job():
validate_request()
t2 = Process(target=self.compute,
args=(
a,b,c,d))
t2.start()
return "Job Started Successfully"
可能不适合生产级应用程序,但您确实可以用最少的努力实现 CPU 并行化。