Как я могу использовать работу мотора open_download_stream с StreamingResponse FastAPI?

Я создаю конечную точку FastAPI, где пользователь веб-клиента может по существу загружать файлы, которые хранятся в MongoDB в виде фрагментов GridFS. Однако StreamingResponse FastAPI не принимает предположительно файлоподобный объект AsyncIOMotorGridOut , возвращаемый методом мотора open_download_stream .

У меня уже есть конечная точка, которая может принимать файлы в форме и вызывать их загрузку в MongoDB. Я ожидаю, что аналогичная вспомогательная функция загрузки будет такой же простой, как эта:

      async def upload_file(db, file: UploadFile):
    """ Uploads file to MongoDB GridFS file system and returns ID to be stored with collection document """
    fs = AsyncIOMotorGridFSBucket(db)
    file_id = await fs.upload_from_stream(
        file.filename,
        file.file,
        # chunk_size_bytes=255*1024*1024, #default 255kB
        metadata={"contentType": file.content_type})
    return file_id

Моя первая попытка - использовать такой помощник:

      async def download_file(db, file_id):
    """Returns  AsyncIOMotorGridOut (non-iterable file-like object)"""
    fs = AsyncIOMotorGridFSBucket(db)
    stream = await fs.open_download_stream(file_id)
    # return download_streamer(stream)
    return stream

Моя конечная точка FastAPI выглядит так:

      app.get("/file/{file_id}")
async def get_file(file_id):
    file = await download_file(db, file_id)
    return StreamingResponse(file, media_type=file.content_type)

При попытке загрузить файл с действительным file_id, я получаю эту ошибку: TypeError: 'AsyncIOMotorGridOut' object is not an iterator

Моя вторая попытка состояла в том, чтобы заставить генератор перебирать фрагменты файла:

      async def download_streamer(file: AsyncIOMotorGridOut):
    """ Returns generator file-like object to be served by StreamingResponse
    https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
    """
    chunk_size = 255*1024*1024
    for chunk in await file.readchunk():
        print(f"chunk: {chunk}")
        yield chunk

Затем я использую комментарий return download_streamer(stream)в моем download_filehelper, но по какой-то причине каждый фрагмент представляет собой просто целое число 255.

Каков наилучший способ получить файл из MongoDB с помощью двигателя и передать его в виде веб-ответа FastAPI без использования временного файла? (У меня нет доступа к жесткому диску, и я не хочу хранить весь файл в памяти — я просто хочу передавать файлы из MongoDB через FastAPI напрямую клиенту по частям).

1 ответ

Мое решение состоит в том, чтобы создать генератор, который находится в синтаксисе Python 3.6+ в соответствии с этим ответом SO . Такой итератор работает с асинхронным вариантом FastAPI StreamingResponse и считывает один фрагмент GridFS за раз (по умолчанию 255 КБ на каждый моторный документ ), используя метод. Этот размер фрагмента устанавливается, когда файл хранится в MongoDB с использованием upload_from_stream(). Необязательная реализация должна была бы использовать .read(n)читать nбайт за раз. я решил использовать readchunk()поэтому во время потока за один раз извлекается ровно 1 документ БД (каждый файл GridFS разбивается на фрагменты и сохраняется в БД по одному фрагменту за раз)

      async def chunk_generator(grid_out):
    while True:
        # chunk = await grid_out.read(1024)
        chunk = await grid_out.readchunk()
        if not chunk:
            break
        yield chunk


async def download_file(db, file_id):
    """Returns iterator over AsyncIOMotorGridOut object"""
    fs = AsyncIOMotorGridFSBucket(db)
    grid_out = await fs.open_download_stream(file_id)
    return chunk_generator(grid_out)

Будущее улучшение будет состоять в том, чтобы иметь download_file()вернуть кортеж, чтобы включить не только генератор, но и метаданные, такие как ContentType.

Другие вопросы по тегам