FastAPI - 如何在中间件中获取响应体
FastAPI - How to get the response body in Middleware
有什么办法可以在中间件中获取响应内容吗?
以下代码是从 here.
复制过来的
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
响应体是一个迭代器,一旦被迭代过,就不能再re-iterated了。因此,您必须将所有迭代数据保存到 list
(或 bytes
变量)并将其用于 return 自定义 Response
,或者再次启动迭代器。下面的选项展示了这两种方法。
选项 1
将数据保存到list
并使用iterate_in_threadpool
to initiate the iterator again, as described here - which is what StreamingResponse
uses, as shown here。
from starlette.concurrency import iterate_in_threadpool
@app.middleware("http")
async def some_middleware(request: Request, call_next):
response = await call_next(request)
response_body = [chunk async for chunk in response.body_iterator]
response.body_iterator = iterate_in_threadpool(iter(response_body))
print(f"response_body={response_body[0].decode()}")
return response
注意 1: 如果您的代码使用 StreamingResponse
,response_body[0]
将仅打印出 [=22= 的第一个 chunk
].要获得整个 response_body
,您应该加入该字节(块)列表,如下所示(.decode()
returns bytes
对象的字符串表示形式):
print(f"response_body={(b''.join(response_body)).decode()}")
注意 2: 如果您有一个 StreamingResponse
流式传输不适合您服务器 RAM 的正文(例如,30GB 的响应),您迭代 response.body_iterator
时可能 运行 进入内存错误。这适用于此答案中列出的两个选项。
选项 2
下面演示了另一种方法,其中响应主体存储在 bytes
对象中(而不是列表,如上所示),并用于 return 自定义 Response
直接(连同原始响应的 status_code
、headers
和 media_type
)。
@app.middleware("http")
async def some_middleware(request: Request, call_next):
response = await call_next(request)
response_body = b""
async for chunk in response.body_iterator:
response_body += chunk
print(f"response_body={response_body.decode()}")
return Response(content=response_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)
有什么办法可以在中间件中获取响应内容吗? 以下代码是从 here.
复制过来的@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
响应体是一个迭代器,一旦被迭代过,就不能再re-iterated了。因此,您必须将所有迭代数据保存到 list
(或 bytes
变量)并将其用于 return 自定义 Response
,或者再次启动迭代器。下面的选项展示了这两种方法。
选项 1
将数据保存到list
并使用iterate_in_threadpool
to initiate the iterator again, as described here - which is what StreamingResponse
uses, as shown here。
from starlette.concurrency import iterate_in_threadpool
@app.middleware("http")
async def some_middleware(request: Request, call_next):
response = await call_next(request)
response_body = [chunk async for chunk in response.body_iterator]
response.body_iterator = iterate_in_threadpool(iter(response_body))
print(f"response_body={response_body[0].decode()}")
return response
注意 1: 如果您的代码使用 StreamingResponse
,response_body[0]
将仅打印出 [=22= 的第一个 chunk
].要获得整个 response_body
,您应该加入该字节(块)列表,如下所示(.decode()
returns bytes
对象的字符串表示形式):
print(f"response_body={(b''.join(response_body)).decode()}")
注意 2: 如果您有一个 StreamingResponse
流式传输不适合您服务器 RAM 的正文(例如,30GB 的响应),您迭代 response.body_iterator
时可能 运行 进入内存错误。这适用于此答案中列出的两个选项。
选项 2
下面演示了另一种方法,其中响应主体存储在 bytes
对象中(而不是列表,如上所示),并用于 return 自定义 Response
直接(连同原始响应的 status_code
、headers
和 media_type
)。
@app.middleware("http")
async def some_middleware(request: Request, call_next):
response = await call_next(request)
response_body = b""
async for chunk in response.body_iterator:
response_body += chunk
print(f"response_body={response_body.decode()}")
return Response(content=response_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)