FastAPI - 如何在中间件中获取响应体

FastAPI - How to get the response body in Middleware

有什么办法可以在中间件中获取响应内容吗? 以下代码是从 here.

复制过来的
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

响应体是一个迭代器,一旦被迭代过,就不能再re-iterated了。因此,您必须将所有迭代数据保存到 list(或 bytes 变量)并将其用于 return 自定义 Response,或者再次启动迭代器。下面的选项展示了这两种方法。

选项 1

将数据保存到list并使用iterate_in_threadpool to initiate the iterator again, as described here - which is what StreamingResponse uses, as shown here

from starlette.concurrency import iterate_in_threadpool

@app.middleware("http")
async def some_middleware(request: Request, call_next):
    response = await call_next(request)
    response_body = [chunk async for chunk in response.body_iterator]
    response.body_iterator = iterate_in_threadpool(iter(response_body))
    print(f"response_body={response_body[0].decode()}")
    return response

注意 1: 如果您的代码使用 StreamingResponseresponse_body[0] 将仅打印出 [=22= 的第一个 chunk ].要获得整个 response_body,您应该加入该字节(块)列表,如下所示(.decode() returns bytes 对象的字符串表示形式):

print(f"response_body={(b''.join(response_body)).decode()}")

注意 2: 如果您有一个 StreamingResponse 流式传输不适合您服务器 RAM 的正文(例如,30GB 的响应),您迭代 response.body_iterator 时可能 运行 进入内存错误。这适用于此答案中列出的两个选项。

选项 2

下面演示了另一种方法,其中响应主体存储在 bytes 对象中(而不是列表,如上所示),并用于 return 自定义 Response 直接(连同原始响应的 status_codeheadersmedia_type)。

@app.middleware("http")
async def some_middleware(request: Request, call_next):
    response = await call_next(request)
    response_body = b""
    async for chunk in response.body_iterator:
        response_body += chunk
    print(f"response_body={response_body.decode()}")
    return Response(content=response_body, status_code=response.status_code, 
        headers=dict(response.headers), media_type=response.media_type)