Модуль Python aiohttp: неоднозначный атрибут .content

Вот небольшой фрагмент кода:

import aiohttp
import aiofiles

async def fetch(url):
    # starting a session
    async with aiohttp.ClientSession() as session:
        # starting a get request
        async with session.get(url) as response:
            # getting response content
            content = await response.content
            return content
 
async def save_file(file_name, content):
    async with aiofiles.open(f'./binary/{file_name}', 'wb') as f:
      while True:
            chunk = content.read(1024)
            if not chunk:
                break
            f.write(chunk)

Я пытаюсь загрузить некоторые двоичные файлы с помощью aiohttp библиотека, а затем передать их сопрограмме с помощью aiofilesбиблиотека для записи файла на диск. Я прочитал документацию, но все еще не могу понять, смогу ли я пройти content = await response.content или он закрыт, когда ручка async with..закрыто? Потому что в дополнительном блоге я нашел:

Согласно документации aiohttp, поскольку объект ответа был создан в диспетчере контекста, технически он неявно вызывает release().

Что меня смущает, стоит ли встраивать логику второй функции в response ручка или моя логика верна?

2 ответа

Решение

Благодаря user4815162342 я смог найти решение, парелелизируя выборку и записывая сопрограммы. Я бы проверил его код как приемлемое решение, но поскольку мне пришлось добавить код, чтобы он заработал, вот он:

# fetch binary from server
async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for chunk in response.content.iter_chunked(4096):
                yield chunk

# write binary function
async def save_file(file_name, chunk_iter):
    list(map(create_dir_tree, list_binary_sub_dirs))
    async with aiofiles.open(f'./binary/bin_ts/{file_name}', 'wb') as f:
        async for chunk in chunk_iter:
            await f.write(chunk)
    

async def main(urls):
    tasks = []
    for url in urls:
        print('running on sublist')
        file_name = url.rpartition('/')[-1]
        request_ts = fetch(url)
        tasks.append(save_file(file_name, request_ts))
    await asyncio.gather(*tasks)

asyncio.run(main(some_list_of_urls))

Менеджер асинхронного контекста закроет ресурсы, связанные с запросом, поэтому, если вы вернетесь из функции, вы должны убедиться, что прочитали все, что вас интересует. Итак, у вас есть два варианта:

  1. прочитать весь ответ в память, например, с помощью content = await response.read() или, если файл не помещается в память (а также, если вы хотите ускорить процесс, читая и записывая параллельно)
  2. используйте очередь или асинхронный итератор для распараллеливания чтения и записи.

Вот непроверенная реализация №2:

async def fetch(url):
    # return an async generator over contents of URL
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # getting response content in chunks no larger than 4K
            for chunk in response.content.iter_chunked(4096):
                yield chunk
 
async def save_file(file_name, content_iter):
    async with aiofiles.open(f'./binary/{file_name}', 'wb') as f:
        for chunk in content_iter:
            f.write(chunk)  # maybe you need to await this?

async def main():
    save_file(file_name, fetch(url))
Другие вопросы по тегам