Параллельные циклы выборки URL с помощью Python
Мне нужно запустить около 500 параллельных циклов. Каждый цикл последовательно извлекает разбитую на страницы конечную точку REST, пока не достигнет последней страницы каждой из 500 конечных точек. Некоторые из этих циклов имеют всего от 5 до 10 страниц, поэтому они быстро заканчиваются, а другие имеют сотни страниц.
Проблема в том, что мне нужно поместить этот выбор URL в последовательный, блокирующий цикл, потому что каждая страница должна выбираться по порядку из-за ограничений API (API выдает ошибку, если я получу страницу 7, а затем страницу 5, например). Итак, единицей параллелизма здесь является каждый цикл, а не каждый URL, извлекаемый внутри цикла.
Там нет тяжелых вычислений нигде. Просто загрузите страницу и добавьте необработанный контент в тему кафки. Я открыт для любых предложений, кроме многопроцессорности, зависящей от многих ядер. AsyncIO, Gevent, многопоточность...
Изменить 1:
Фактическая проблема заключается в том, что если я использую aiohttp для выборки каждой страницы внутри каждого цикла асинхронно, у меня нет гарантий, что страница 2 будет выбрана после страницы 1. Запрос будет инициирован в правильной последовательности, но нет абсолютно никакой гарантии, что запрос поступит и будет обработан в конечной точке в правильной последовательности.
Изменить 2:
Как указано пользователем 4815162342 aiohttp должен работать
Спасибо!
1 ответ
В asyncio вы можете запустить параллельно столько циклов, сколько имеется конечных точек, и ждать, пока все они завершатся. Каждый цикл будет использовать aiohttp для последовательного извлечения страниц конечной точки. Например:
async def download_loop(session, endpoint):
for i in itertools.count(1):
try:
async with session.get(endpoint, params={'page': str(i)}) as resp:
content = await resp.read()
except aiohttp.ClientResponseError:
break # no more pages
# do something with the response content
async def download(endpoints):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession() as session:
# Start all loops in parallel and wait for them to finish.
# This will start as many loops as there are endpoints.
await asyncio.wait([download_loop(session, endpoint)
for endpoint in endpoints])
# for testing:
loop = asyncio.get_event_loop()
loop.run_until_complete(download(['http://endpoint1', 'http://endpoint2', ...]))
Производственный код, вероятно, также поймает aiohttp.ClientConnectionError
и повторите этот URL.