Асинхронный Python itertools цепочка нескольких генераторов

ОБНОВЛЕННЫЙ ВОПРОС ДЛЯ ЯСНОСТИ

Предположим, у меня есть 2 функции генератора обработки:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

Я могу связать их с помощью itertools

from itertools import chain

mix = chain(gen1(), gen2())

и тогда я могу создать другой объект функции генератора с ним,

def mix_yield():
   for item in mix:
      yield item

или просто если я просто хочу next(mix), Это здесь.

У меня вопрос, как я могу сделать эквивалент в асинхронном коде?

Потому что мне это нужно для:

  • возврат в доход (один за другим), или с next итератор
  • самый быстрый разрешенный доход первым (асинхронный)

ПРЕД. ОБНОВИТЬ:

После экспериментов и исследований я обнаружил библиотеку aiostream, которая сообщает как асинхронную версию itertools, и вот что я сделал:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

но я все еще не могу сделать next(a_mix)

TypeError: 'merge' object is not an iterator

или же next(await a_mix)

raise StreamEmpty()

Хотя я все еще могу сделать это в списке:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

Итак, одна цель достигнута, еще одна уйдет:

  • возврат в доход (один за другим), или с next итератор

    - самый быстрый разрешенный доход первым (асинхронный)

1 ответ

Решение

Асинхронный эквивалент next это __anext__ метод на асинхронном итераторе. Итератор в свою очередь получается путем вызова __aiter__ (по аналогии с __iter__) на повторяемом. Развернутая асинхронная итерация выглядит так:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

__anext__ метод поднимет StopAsyncIteration когда больше нет доступных элементов. Чтобы перебрать асинхронные итераторы, вы должны использовать async for скорее, чем for,

Вот работающий пример, основанный на вашем коде, использующий оба __anext__ а также async for исчерпать поток, созданный с aiostream.stream.combine.merge:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())

Я наткнулся на этот ответ и посмотрел библиотеку aiostream. Вот код, который я придумал для объединения нескольких генераторов async. Он не использует никаких библиотек.

async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
    pending = gens.copy()
    pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
    while len(pending_tasks) > 0:
        done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
        for d in done:
            try:
                result = d.result()
                yield result
                dg = pending_tasks[d]
                pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
            except StopAsyncIteration as sai:
                print("Exception in getting result", sai)
            finally:
                del pending_tasks[d]

Надеюсь, это поможет вам, и дайте мне знать, если в этом есть какие-либо ошибки.

Другие вопросы по тегам