Семафор / несколько блокировок пула в asyncio для 1 прокси - aiohttp

У меня есть 5000 000 URL. и хочу получить ответ каждого асинхронно.

import aiohttp
import asyncio    

@asyncio.coroutine
def worker(url):
    response = yield from aiohttp.request('GET', url, connector=aiohttp.TCPConnector(share_cookies=True, verify_ssl=False))
    body = yield from response.read_and_close()

    print(url)

def main():
    url_list = [] # lacs of urls, extracting from a file

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([worker(u) for u in url_list]))

main()

Я хочу 200 соединений одновременно (одновременно 200), не больше, чем это, потому что

когда я запускаю эту программу на 50 URL, она работает нормально, т.е. url_list[:50]но если я передам весь список, я получу эту ошибку

aiohttp.errors.ClientOSError: Cannot connect to host www.example.com:443 ssl:True Future/Task exception was never retrieved future: Task()

может быть, частота слишком велика, и сервер отказывается отвечать после лимита?

1 ответ

Решение

Да, можно ожидать, что сервер перестанет отвечать после того, как вызовет слишком большой трафик (независимо от определения "слишком большого трафика").

Одним из способов ограничить количество одновременных запросов (регулировать их) в таких случаях является использование asyncio.Semaphore аналогичен используемому в многопоточности: точно так же, как и здесь, вы создаете семафор и убедитесь, что операция, которую вы хотите отрегулировать, запрашивает этот семафор до выполнения реальной работы и освобождения ее после.

Для вашего удобства, asyncio.Semaphore реализует менеджер контекста, чтобы сделать его еще проще.

Самый основной подход:

CONCURRENT_REQUESTS = 200


@asyncio.coroutine
def worker(url, semaphore):
    # Aquiring/releasing semaphore using context manager.
    with (yield from semaphore):
        response = yield from aiohttp.request(
            'GET',
            url,
            connector=aiohttp.TCPConnector(share_cookies=True,
                                           verify_ssl=False))
        body = yield from response.read_and_close()

        print(url)


def main():
    url_list = [] # lacs of urls, extracting from a file

    semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([worker(u, semaphore) for u in url_list]))    
Другие вопросы по тегам