Возможно возвращение данных генератора asyncio из цикла событий?

Я хотел бы читать из нескольких одновременных потоковых HTTP-запросов внутри сопрограмм с использованием httpx и возвращать данные моей неасинхронной функции, запускающей цикл событий, а не просто возвращать окончательные данные.

Но если я сделаю свои асинхронные функции yield вместо return, я получаю жалобы, что asyncio.as_completed() а также loop.run_until_complete() ожидает сопрограмму или будущее, а не асинхронный генератор.

Таким образом, единственный способ заставить это работать вообще - это собрать все потоковые данные внутри каждой сопрограммы и вернуть все данные после завершения запроса. Затем соберите все результаты сопрограммы и, наконец, верните их неасинхронной вызывающей функции.

Это означает, что я должен хранить все в памяти и ждать, пока не завершится самый медленный запрос, прежде чем я получу все свои данные, что сводит на нет весь смысл потоковой передачи HTTP-запросов.

Есть ли способ сделать что-то подобное? Моя текущая глупая реализация выглядит так:

def collect_data(urls):
    """Non-async function wishing it was a non-async generator"""

    async def stream(async_client, url, payload):
        data = []
        async with async_client.stream("GET", url=url) as ar:
            ar.raise_for_status()
            async for line in ar.aiter_lines():
                data.append(line)
                # would like to yield each line here
        return data

    async def execute_tasks(urls):
        all_data = []
        async with httpx.AsyncClient() as async_client:
            tasks = [stream(async_client, url) for url in urls]
            for coroutine in asyncio.as_completed(tasks):
                all_data += await coroutine
                # would like to iterate and yield each line here
        return all_events

    try:
        loop = asyncio.get_event_loop()
        data = loop.run_until_complete(execute_tasks(urls=urls))
        return data
        # would like to iterate and yield the data here as it becomes available
    finally:
        loop.close()

РЕДАКТИРОВАТЬ: я пробовал некоторые решения, используяasyncio.Queue а также trio каналы памяти, но поскольку я могу читать только те, которые находятся в асинхронной области, это не приближает меня к решению

РЕДАКТИРОВАТЬ 2: Причина, по которой я хочу использовать это из неасинхронного генератора, заключается в том, что я хочу использовать его из приложения Django с помощью потокового API Django Rest Framework.

3 ответа

Решение

Обычно вам нужно просто сделать collect_dataasync и везде используйте асинхронный код - именно так был разработан asyncio. Но если это по какой-то причине невозможно, вы можете выполнить итерацию асинхронного итератора вручную, применив некоторый связующий код:

def iter_over_async(ait, loop):
    ait = ait.__aiter__()
    async def get_next():
        try:
            obj = await ait.__anext__()
            return False, obj
        except StopAsyncIteration:
            return True, None
    while True:
        done, obj = loop.run_until_complete(get_next())
        if done:
            break
        yield obj

Вышеупомянутое работает путем предоставления асинхронного закрытия, которое продолжает извлекать значения из асинхронного итератора с использованием __anext__magic, возвращая объекты по мере их поступления. Это асинхронное закрытие вызывается с помощьюrun_until_complete()в цикле внутри обычного генератора синхронизации. (Замыкание фактически возвращает пару индикатора готовности и фактического объекта, чтобы избежать распространенияStopAsyncIteration через run_until_complete, который может не поддерживаться.)

Имея это место, вы можете сделать свой execute_tasks асинхронный генератор (async def с yield) и перебрать его, используя:

for chunk in iter_over_async(execute_tasks(urls), loop):
    ...

Сразу отметим, что этот подход несовместим с asyncio.run, и в дальнейшем может вызвать проблемы.

Просто хочу обновить решение @user4815162342 для использованияasyncio.run_coroutine_threadsafeвместоloop.run_until_complete.

      import asyncio
from typing import Any, AsyncGenerator

def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):
    ait = async_generator.__aiter__()

    async def get_next() -> tuple[bool, Any]:
        try:
            obj = await ait.__anext__()
            done = False

        except StopAsyncIteration:
            obj = None
            done = True

        return done, obj

    while True:
        done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()

        if done:
            break

        yield obj

Я также хотел бы добавить, что я нашел такие инструменты весьма полезными в процессе кусочного преобразования синхронного кода в асинхронный код.

Есть хорошая библиотека, которая делает это (и многое другое!) под названием pypeln:

      import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Другие вопросы по тегам