从 FastAPI 中的后台任务获取 return 状态
Get return status from Background Tasks in FastAPI
我有一个 API,它发布创建后台作业的作业,我想在另一个 GET api 上发送作业状态。如何做到这一点?在 background_work()
函数中,我将使用多处理作为内部调用目标 subprocess.call()
调用。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def background_work(data: str):
# some computation on data and return it
return status
@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(data: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_work, data)
return {"message": "Job Created, check status after some time!"}
@app.get("/get_status")
def status():
#how to return status of job submitted to background task
这目前无法通过 FastAPI 实现,因为后台任务只是对发送响应后要调用的可调用对象的引用,它们不存储任何类型的状态。
您将不得不使用 Celery 或其他库。
试试这个模式:
import time
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
class TaskState:
def __init__(self):
self.counter = 0
def background_work(self):
while True:
self.counter += 1
time.sleep(1)
def get_state(self):
return self.counter
state = TaskState()
@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(background_tasks: BackgroundTasks):
background_tasks.add_task(state.background_work)
return {"message": "Job Created, check status after some time!"}
@app.get("/get_status")
def status():
return state.get_state()
我正在像这样使用 fastAPI,结合 concurrent.futures.ProcessPoolExecutor()
和 asyncio 来管理长 运行 作业。
如果你不想依赖其他模块(芹菜等),你需要自己管理你的工作状态,并将其存储在某个地方。我将它存储在数据库中,以便在重新启动服务器后可以恢复挂起的作业。
请注意,您不得在应用程序的 background_tasks
中执行 CPU 密集计算,因为它在为请求提供服务的同一异步事件循环中运行,并且会停止您的应用程序。而是将它们提交到线程池或进程池。
我有一个 API,它发布创建后台作业的作业,我想在另一个 GET api 上发送作业状态。如何做到这一点?在 background_work()
函数中,我将使用多处理作为内部调用目标 subprocess.call()
调用。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def background_work(data: str):
# some computation on data and return it
return status
@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(data: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_work, data)
return {"message": "Job Created, check status after some time!"}
@app.get("/get_status")
def status():
#how to return status of job submitted to background task
这目前无法通过 FastAPI 实现,因为后台任务只是对发送响应后要调用的可调用对象的引用,它们不存储任何类型的状态。
您将不得不使用 Celery 或其他库。
试试这个模式:
import time
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
class TaskState:
def __init__(self):
self.counter = 0
def background_work(self):
while True:
self.counter += 1
time.sleep(1)
def get_state(self):
return self.counter
state = TaskState()
@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(background_tasks: BackgroundTasks):
background_tasks.add_task(state.background_work)
return {"message": "Job Created, check status after some time!"}
@app.get("/get_status")
def status():
return state.get_state()
我正在像这样使用 fastAPI,结合 concurrent.futures.ProcessPoolExecutor()
和 asyncio 来管理长 运行 作业。
如果你不想依赖其他模块(芹菜等),你需要自己管理你的工作状态,并将其存储在某个地方。我将它存储在数据库中,以便在重新启动服务器后可以恢复挂起的作业。
请注意,您不得在应用程序的 background_tasks
中执行 CPU 密集计算,因为它在为请求提供服务的同一异步事件循环中运行,并且会停止您的应用程序。而是将它们提交到线程池或进程池。