如何在 FastAPI 应用程序中发送操作进度?

How to send a progress of operation in a FastAPI app?

我已经部署了一个 fastapi 端点,

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}

我可以从 API 端点请求输出,它对我来说工作得很好。

现在,我面临的最大挑战是了解每次迭代需要多少时间。因为在 UI 部分(那些正在访问我的 API 端点的人)我想帮助他们为每个正在处理的 iteration/file 显示一个进度条(TIME TAKEN)。

有什么方法可以实现吗?如果是这样,请帮助我如何进一步进行?

谢谢。

方法

轮询

跟踪任务进度的首选方法是轮询:

  1. 收到 request 以在后端启动任务后:
    1. 在存储中创建一个 task object(例如内存中,redis 等)。 task object 必须包含以下数据:task IDstatus(待处理、已完成)、result 和其他。
    2. 运行 后台任务(协程、线程、多处理、任务队列 Celery, arq, aio-pika, dramatiq 等)
    3. 通过 return 之前收到的 task ID.
    4. 立即回复答案 202 (Accepted)
  2. 更新任务状态:
    1. 这可以来自任务本身,如果它知道任务存储并且可以访问它。任务本身会定期更新有关自身的信息。
    2. 或使用任务监视器(Observerproducer-consumer 模式),它将监视任务的状态及其结果。并且它还会更新存储中的信息。
  3. client side (front-end) 上启动一个 轮询周期 用于端点 /task/{ID}/status 的任务状态,它从任务存储。

流式响应

Streaming 是一种不太方便的定期获取请求处理状态的方法。当我们在不关闭连接的情况下逐渐推送响应时。它有许多明显的缺点,例如,如果连接中断,您可能会丢失信息。 Streaming Api 是 REST Api.

之外的另一种方法

网络套接字

您还可以使用 websockets 进行实时通知和双向通信。

链接:

  • 进度条的轮询方法示例和 django + celery 的更详细说明可在以下链接中找到:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • 我在此处提供了使用多处理在 FastAPI 中 运行ning 后台任务的简化示例:

旧答案:

您可以在后台 运行 一个任务,return 它 id 并提供一个前端会定期调用的 /status 端点。在状态响应中,您可以 return 您的任务现在处于什么状态(例如,等待当前处理的文件的编号)。我提供了几个简单的例子 .

演示

轮询

使用 asyncio 任务的方法演示(单工作者解决方案):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

来自问题

的循环改编示例

后台处理函数定义为 def 并且 FastAPI 运行 将其放在线程池中。

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

流媒体

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"{i.filename} processed\n"
    yield f"OK\n"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))

以下是使用唯一标识符和包含作业信息的全局可用字典的解决方案:

注意:在您使用动态键值(在使用中的示例 uuid 中)并将应用程序保持在单个进程中之前,可以安全使用下面的代码。

  1. 要启动应用程序,请创建一个文件main.py
  2. 运行 uvicorn main:app --reload
  3. 通过访问 http://127.0.0.1:8000/
  4. 创建工作条目
  5. 重复步骤 3 以创建多个作业
  6. 转到 http://127.0.0.1/status 页面查看页面状态。
  7. 转到 http://127.0.0.1/status/{identifier} 按作业 ID 查看作业的进度。

应用代码:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = {'jobs': {}}

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}

@app.get('/status')
def status():
    return {
        'all': list(context['jobs'].values()),
    }

@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }