Асинхронный Python itertools цепочка нескольких генераторов
ОБНОВЛЕННЫЙ ВОПРОС ДЛЯ ЯСНОСТИ
Предположим, у меня есть 2 функции генератора обработки:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
Я могу связать их с помощью itertools
from itertools import chain
mix = chain(gen1(), gen2())
и тогда я могу создать другой объект функции генератора с ним,
def mix_yield():
for item in mix:
yield item
или просто если я просто хочу next(mix)
, Это здесь.
У меня вопрос, как я могу сделать эквивалент в асинхронном коде?
Потому что мне это нужно для:
- возврат в доход (один за другим), или с
next
итератор - самый быстрый разрешенный доход первым (асинхронный)
ПРЕД. ОБНОВИТЬ:
После экспериментов и исследований я обнаружил библиотеку aiostream, которая сообщает как асинхронную версию itertools, и вот что я сделал:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
но я все еще не могу сделать next(a_mix)
TypeError: 'merge' object is not an iterator
или же next(await a_mix)
raise StreamEmpty()
Хотя я все еще могу сделать это в списке:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
Итак, одна цель достигнута, еще одна уйдет:
возврат в доход (один за другим), или с
next
итератор- самый быстрый разрешенный доход первым (асинхронный)
1 ответ
Асинхронный эквивалент next
это __anext__
метод на асинхронном итераторе. Итератор в свою очередь получается путем вызова __aiter__
(по аналогии с __iter__
) на повторяемом. Развернутая асинхронная итерация выглядит так:
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
__anext__
метод поднимет StopAsyncIteration
когда больше нет доступных элементов. Чтобы перебрать асинхронные итераторы, вы должны использовать async for
скорее, чем for
,
Вот работающий пример, основанный на вашем коде, использующий оба __anext__
а также async for
исчерпать поток, созданный с aiostream.stream.combine.merge
:
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())
Я наткнулся на этот ответ и посмотрел библиотеку aiostream. Вот код, который я придумал для объединения нескольких генераторов async. Он не использует никаких библиотек.
async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]
Надеюсь, это поможет вам, и дайте мне знать, если в этом есть какие-либо ошибки.