FastAPI:如何在 return 响应后记录 POST 方法的 return 值?

FastAPI: How to log the return value of a POST method after returning the response?

所以我正在研究我的第一个 REST API,如果我错过了一些基本的东西,请提前致歉。无论如何,我有一个函数从另一个服务器接收 json 请求,对其进行处理(根据数据进行预测),然后 return 将结果发送给另一个 json。我想在服务器的本地磁盘上记录对该端点的所有请求及其结果,以用于评估目的和重新训练模型。但是,为了最大限度地减少 return 将结果发送给用户的延迟,我想先 return 响应数据,然后再将其写入本地磁盘。对我来说如何正确地做到这一点并不明显,因为 FastAPI 范式要求 POST 方法的结果是修饰函数的 return 值,所以我想要的任何东西处理数据必须在 之前完成 returned.

下面是一个最小的工作示例,我认为这是迄今为止我最接近正确的尝试,使用带有 log 装饰器的自定义对象 - 我的想法只是将结果分配给日志对象作为 class 属性,然后使用另一种方法将其写入磁盘,但我不知道如何确保该函数在 after get_data 每次。

import json
import uvicorn
from fastapi import FastAPI, Request
from functools import wraps
from pydantic import BaseModel

class Blob(BaseModel):
    id: int
    x: float

def crunch_numbers(data: Blob) -> dict:
    # does some stuff
    return {'foo': 'bar'}

class PostResponseLogger:

    def __init__(self) -> None:
        self.post_result = None

    def log(self, func, *args, **kwargs):
        @wraps(func)
        def func_to_log(*args, **kwargs):
            post_result = func(*args, **kwargs)
            self.post_result = post_result

            # how can this be done outside of this function ???
            self.write_data()

            return post_result
        return func_to_log

    def write_data(self):
        if self.post_result:
            with open('output.json', 'w') as f:
                    json.dump(self.post_result, f)

def main():
    app = FastAPI()
    logger = PostResponseLogger()

    @app.post('/get_data/')
    @logger.log
    def get_data(input_json: dict, request: Request):
        result = crunch_numbers(input_json)
        return result

    uvicorn.run(app=app)

if __name__ == '__main__':
    main()

基本上,我的问题归结为:“有没有办法在 PostResponseLogger class 中,在每次调用 self.log 后自动调用 self.write_data? ",但如果我完全使用了错误的方法,也欢迎任何其他建议:)

你可以有一个 background task for that purpose. A background task "will run only once the response has been sent" (as per Starlette documentation)。可以在后台定义一个任务函数运行用于写入日志数据,如下图:

def write_log_data():
    logger.write_data()

为您的 route 定义类型声明为 BackgroundTasks 的参数,并在 route 内部将(上述)任务函数传递给 background_tasks使用方法 add_task():

的对象
from fastapi import BackgroundTasks

@app.post('/get_data/')
@logger.log
def get_data(input_json: dict, request: Request, background_tasks: BackgroundTasks):
    result = crunch_numbers(input_json)
    background_tasks.add_task(write_log_data)
    return result

如果使用中间件来捕获和记录响应数据,则可以应用相同的原则,如 . As a future reference, if you (or anyone) ever need to use async/await syntax, and run into concurrency issues (while performing some heavy background computation), please have a look at the documentation here, as well as and 答案所述。