Возможно возвращение данных генератора asyncio из цикла событий?
Я хотел бы читать из нескольких одновременных потоковых HTTP-запросов внутри сопрограмм с использованием httpx и возвращать данные моей неасинхронной функции, запускающей цикл событий, а не просто возвращать окончательные данные.
Но если я сделаю свои асинхронные функции yield вместо return, я получаю жалобы, что asyncio.as_completed()
а также loop.run_until_complete()
ожидает сопрограмму или будущее, а не асинхронный генератор.
Таким образом, единственный способ заставить это работать вообще - это собрать все потоковые данные внутри каждой сопрограммы и вернуть все данные после завершения запроса. Затем соберите все результаты сопрограммы и, наконец, верните их неасинхронной вызывающей функции.
Это означает, что я должен хранить все в памяти и ждать, пока не завершится самый медленный запрос, прежде чем я получу все свои данные, что сводит на нет весь смысл потоковой передачи HTTP-запросов.
Есть ли способ сделать что-то подобное? Моя текущая глупая реализация выглядит так:
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data += await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
РЕДАКТИРОВАТЬ: я пробовал некоторые решения, используяasyncio.Queue
а также trio
каналы памяти, но поскольку я могу читать только те, которые находятся в асинхронной области, это не приближает меня к решению
РЕДАКТИРОВАТЬ 2: Причина, по которой я хочу использовать это из неасинхронного генератора, заключается в том, что я хочу использовать его из приложения Django с помощью потокового API Django Rest Framework.
3 ответа
Обычно вам нужно просто сделать collect_data
async и везде используйте асинхронный код - именно так был разработан asyncio. Но если это по какой-то причине невозможно, вы можете выполнить итерацию асинхронного итератора вручную, применив некоторый связующий код:
def iter_over_async(ait, loop):
ait = ait.__aiter__()
async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
Вышеупомянутое работает путем предоставления асинхронного закрытия, которое продолжает извлекать значения из асинхронного итератора с использованием __anext__
magic, возвращая объекты по мере их поступления. Это асинхронное закрытие вызывается с помощьюrun_until_complete()
в цикле внутри обычного генератора синхронизации. (Замыкание фактически возвращает пару индикатора готовности и фактического объекта, чтобы избежать распространенияStopAsyncIteration
через run_until_complete
, который может не поддерживаться.)
Имея это место, вы можете сделать свой execute_tasks
асинхронный генератор (async def
с yield
) и перебрать его, используя:
for chunk in iter_over_async(execute_tasks(urls), loop):
...
Сразу отметим, что этот подход несовместим с asyncio.run
, и в дальнейшем может вызвать проблемы.
Просто хочу обновить решение @user4815162342 для использованияasyncio.run_coroutine_threadsafe
вместоloop.run_until_complete
.
import asyncio
from typing import Any, AsyncGenerator
def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):
ait = async_generator.__aiter__()
async def get_next() -> tuple[bool, Any]:
try:
obj = await ait.__anext__()
done = False
except StopAsyncIteration:
obj = None
done = True
return done, obj
while True:
done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()
if done:
break
yield obj
Я также хотел бы добавить, что я нашел такие инструменты весьма полезными в процессе кусочного преобразования синхронного кода в асинхронный код.
Есть хорошая библиотека, которая делает это (и многое другое!) под названием pypeln:
import pypeln as pl
import asyncio
from random import random
async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1
async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]