Промежуточное ПО FastAPI просматривает ответы

Я пытаюсь написать простое промежуточное ПО для FastAPI, заглядывающего в тела ответа.

В этом примере я просто регистрирую содержимое тела:

app = FastAPI()

@app.middleware("http")
async def log_request(request, call_next):
    logger.info(f'{request.method} {request.url}')
    response = await call_next(request)
    logger.info(f'Status code: {response.status_code}')
    async for line in response.body_iterator:
        logger.info(f'    {line}')
    return response

Однако похоже, что я "потребляю" тело таким образом, что приводит к следующему исключению:

  ...
  File ".../python3.7/site-packages/starlette/middleware/base.py", line 26, in __call__
    await response(scope, receive, send)
  File ".../python3.7/site-packages/starlette/responses.py", line 201, in __call__
    await send({"type": "http.response.body", "body": b"", "more_body": False})
  File ".../python3.7/site-packages/starlette/middleware/errors.py", line 156, in _send
    await send(message)
  File ".../python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py", line 515, in send
    raise RuntimeError("Response content shorter than Content-Length")
RuntimeError: Response content shorter than Content-Length

Пытаясь заглянуть в объект ответа, я не нашел другого способа прочитать его содержимое. Как правильно это делать?

4 ответа

Решение

У меня была аналогичная потребность в промежуточном программном обеспечении FastAPI, и хотя это не идеально, вот что мы в итоге получили:

app = FastAPI()

@app.middleware("http")
async def log_request(request, call_next):
    logger.info(f'{request.method} {request.url}')
    response = await call_next(request)
    logger.info(f'Status code: {response.status_code}')
    body = b""
    async for chunk in response.body_iterator:
        body += chunk
    # do something with body ...
    return Response(
        content=body,
        status_code=response.status_code,
        headers=dict(response.headers),
        media_type=response.media_type
    )

Имейте в виду, что такая реализация проблематична из-за потоковых ответов, которые не помещаются в ОЗУ вашего сервера (представьте ответ размером 100 ГБ).

В зависимости от того, что делает ваше приложение, вы решите, проблема это или нет.


В случае, когда некоторые из ваших конечных точек дают большие ответы, вы можете избежать использования промежуточного программного обеспечения и вместо этого реализовать собственный ApiRoute. Этот настраиваемый ApiRoute будет иметь ту же проблему с потреблением тела, но вы можете ограничить его использование определенными конечными точками.

Узнайте больше на https://fastapi.tiangolo.com/advanced/custom-request-and-route/

Я знаю, что это относительно старый пост, но недавно я столкнулся с этой проблемой и нашел решение:

Код промежуточного программного обеспечения

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import json
from .async_iterator_wrapper import async_iterator_wrapper as aiwrap

class some_middleware(BaseHTTPMiddleware):
   async def dispatch(self, request:Request, call_next:RequestResponseEndpoint):
      # --------------------------
      # DO WHATEVER YOU TO DO HERE
      #---------------------------
      
      response = await call_next(request)

      # Consuming FastAPI response and grabbing body here
      resp_body = [section async for section in response.__dict__['body_iterator']]
      # Repairing FastAPI response
      response.__setattr__('body_iterator', aiwrap(resp_body)

      # Formatting response body for logging
      try:
         resp_body = json.loads(resp_body[0].decode())
      except:
         resp_body = str(resp_body)

async_iterator_wrapper Код из TypeError из Python 3 async for loop

class async_iterator_wrapper:
    def __init__(self, obj):
        self._it = iter(obj)
    def __aiter__(self):
        return self
    async def __anext__(self):
        try:
            value = next(self._it)
        except StopIteration:
            raise StopAsyncIteration
        return value

Я очень надеюсь, что это может кому-то помочь! Я нашел это очень полезным для ведения журнала.

Большое спасибо @Eddified за класс aiwrap

Это можно легко сделать с помощью BackgroundTasks (https://fastapi.tiangolo.com/tutorial/background-tasks/) .

Не блокирующий, код выполняется после отправки ответа клиенту, его довольно легко добавить.

Просто возьмите requestобъект и передать его фоновой задаче. Кроме того, прежде чем возвращать ответный дикт (или любые другие данные), передайте его фоновой задаче. Недостатком является то, что часть responseпотерян, только возвращенные данные будут переданы в BT.

Кроме того, еще один недостаток: эти фоновые задачи должны быть добавлены к каждой конечной точке.

Например

      from fastapi import BackgroundTasks, FastAPI
from starlette.requests import Request

app = FastAPI()

async def log_request(request, response):
    logger.info(f'{request.method} {request.url}')
    logger.info(f'{response['message']}')


@app.post("/dummy-endpoint/")
async def send_notification(request: Request, background_tasks: BackgroundTasks):
    my_response = {"message": "Notification sent in the background"}
    background_tasks.add_task(log_request, request=request, response=my_response)
    return my_response

Или, если вы используете APIRouter, мы можем сделать так:

      class CustomAPIRoute(APIRoute):
    def get_route_handler(self):
        app = super().get_route_handler()
        return wrapper(app)

def wrapper(func):
    async def _app(request):
        response = await func(request)

        print(vars(request), vars(response))

        return response
    return _app

router = APIRouter(route_class=CustomAPIRoute)

вы можете напрямую видеть или получать доступ к ответу, телу запроса и другим атрибутам.
Если вы хотите захватить httpexcpetion, вам следует обернуть response = await func(request) с участием try: except HTTPException as e.

ссылки:
get_request_handler() - вызов get_route_handler
get_request_handler get_route_handler ()
класс APIRoute

Другие вопросы по тегам