窥视响应的 FastAPI 中间件

FastAPI middleware peeking into responses

我尝试为 FastAPI 编写一个简单的中间件来窥视响应主体。

在这个例子中我只记录正文内容:

app = FastAPI()

@app.middleware("http")
async def log_request(request, call_next):
    logger.info(f'{request.method} {request.url}')
    response = await call_next(request)
    logger.info(f'Status code: {response.status_code}')
    async for line in response.body_iterator:
        logger.info(f'    {line}')
    return response

不过看起来我"consume"正文是这样的,导致这个异常:

  ...
  File ".../python3.7/site-packages/starlette/middleware/base.py", line 26, in __call__
    await response(scope, receive, send)
  File ".../python3.7/site-packages/starlette/responses.py", line 201, in __call__
    await send({"type": "http.response.body", "body": b"", "more_body": False})
  File ".../python3.7/site-packages/starlette/middleware/errors.py", line 156, in _send
    await send(message)
  File ".../python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py", line 515, in send
    raise RuntimeError("Response content shorter than Content-Length")
RuntimeError: Response content shorter than Content-Length

尝试查看响应对象时,我找不到任何其他方式来读取其内容。正确的做法是什么?

我对 FastAPI 中间件有类似的需求,虽然不理想,但我们最终得到的是:

app = FastAPI()

@app.middleware("http")
async def log_request(request, call_next):
    logger.info(f'{request.method} {request.url}')
    response = await call_next(request)
    logger.info(f'Status code: {response.status_code}')
    body = b""
    async for chunk in response.body_iterator:
        body += chunk
    # do something with body ...
    return Response(
        content=body,
        status_code=response.status_code,
        headers=dict(response.headers),
        media_type=response.media_type
    )

请注意,这样的实现会产生问题,因为响应会流式传输不适合您的服务器 RAM 的正文(想象一下 100GB 的响应)。

根据您的应用程序的用途,您将决定它是否是一个问题。


如果您的某些端点产生大量响应,您可能希望避免使用中间件,而是实施自定义 ApiRoute。此自定义 ApiRoute 在使用正文时会遇到相同的问题,但您可以将其使用限制在特定端点。

https://fastapi.tiangolo.com/advanced/custom-request-and-route/

了解更多

我知道这是一个比较老的 post,但我最近 运行 遇到了这个问题并提出了解决方案:

中间件代码

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import json
from .async_iterator_wrapper import async_iterator_wrapper as aiwrap

class some_middleware(BaseHTTPMiddleware):
   async def dispatch(self, request:Request, call_next:RequestResponseEndpoint):
      # --------------------------
      # DO WHATEVER YOU TO DO HERE
      #---------------------------
      
      response = await call_next(request)

      # Consuming FastAPI response and grabbing body here
      resp_body = [section async for section in response.__dict__['body_iterator']]
      # Repairing FastAPI response
      response.__setattr__('body_iterator', aiwrap(resp_body)

      # Formatting response body for logging
      try:
         resp_body = json.loads(resp_body[0].decode())
      except:
         resp_body = str(resp_body)

async_iterator_wrapper代码来自

class async_iterator_wrapper:
    def __init__(self, obj):
        self._it = iter(obj)
    def __aiter__(self):
        return self
    async def __anext__(self):
        try:
            value = next(self._it)
        except StopIteration:
            raise StopAsyncIteration
        return value

我真的希望这可以帮助别人!我发现这对日志记录很有帮助。

非常感谢@Eddified 的 aiwrap class

或者如果你使用API​​Router,我们也可以这样做:

class CustomAPIRoute(APIRoute):
    def get_route_handler(self):
        app = super().get_route_handler()
        return wrapper(app)

def wrapper(func):
    async def _app(request):
        response = await func(request)

        print(vars(request), vars(response))

        return response
    return _app

router = APIRouter(route_class=CustomAPIRoute)

您可以直接查看或访问响应和请求的主体,以及其他属性。
如果你想捕获 httpexcpetion,你应该用 try: except HTTPException as e.

包装 response = await func(request)

参考文献:
get_request_handler() - get_route_handler 调用 get_request_handler
get_route_handler()
APIRoute class

这可以通过 BackgroundTasks 轻松完成 (https://fastapi.tiangolo.com/tutorial/background-tasks/)

非阻塞,代码在响应发送到客户端后执行,非常容易添加。

只需获取 request 对象并将其传递给后台任务。 此外,在返回响应字典(或任何数据)之前,将其传递给后台任务。缺点是response有一部分丢失,只有返回的数据会传给BT。

此外,还有另一个缺点:必须将这些后台任务添加到每个端点。

例如

from fastapi import BackgroundTasks, FastAPI
from starlette.requests import Request

app = FastAPI()

async def log_request(request, response):
    logger.info(f'{request.method} {request.url}')
    logger.info(f'{response['message']}')


@app.post("/dummy-endpoint/")
async def send_notification(request: Request, background_tasks: BackgroundTasks):
    my_response = {"message": "Notification sent in the background"}
    background_tasks.add_task(log_request, request=request, response=my_response)
    return my_response