Как я могу асинхронно отобразить / отфильтровать асинхронную итерацию?

Допустим, у меня есть асинхронная итерация, которую я могу передать, используя async forКак тогда я могу отобразить и отфильтровать его на новый асинхронный итератор? Следующий код, который является адаптацией того, как я делал бы то же самое с синхронной итерацией, не работает, так как yield не допускается внутри async defs.

async def mapfilter(aiterable, p, func):
    async for payload in aiterable:
        if p(payload):

            # This part isn't allowed, but hopefully it should be clear
            # what I'm trying to accomplish.
            yield func(payload)

4 ответа

Решение

Недавно опубликованный проект PEP (PEP 525), поддержка которого запланирована на Python 3.6, предлагает разрешить асинхронные генераторы с тем же синтаксисом, который вы придумали.

Между тем, вы также можете использовать asyncio_extras Библиотека, упомянутая CryingCyclops в ее комментарии, если вы не хотите иметь дело с CryingCyclops асинхронного итератора.

Из документов:

@async_generator
async def mygenerator(websites):
    for website in websites:
        page = await http_fetch(website)
        await yield_async(page)

async def fetch_pages():
    websites = ('http://foo.bar', 'http://example.org')
    async for sanitized_page in mygenerator(websites):
        print(sanitized_page)

Существует также библиотека async_generator, которая поддерживает yield from строит.

Плохо названная сторонняя библиотека asyncstdlibпредоставляет фильтр и карту для асинхронных итераций.

Вы не можете использовать yield внутри сопрограмм. Чтобы реализовать вашу идею, я вижу только один способ - реализовать асинхронный итератор. Если я прав, что-то в этом роде

class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()  # StopAsyncIteration would be raise here on no new values
            if self.p(payload):
                return self.func(payload)

Давайте проверим это. Вот полный пример с помощником arange класс (я взял это отсюда):

import asyncio


class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration


class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()
            if self.p(payload):
                return self.func(payload)


async def main():
    aiterable = arange(5)
    p = lambda x: bool(x>2)
    func = lambda x: x*2

    async for i in MapFilter(aiterable, p, func):
        print(i)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Выход:

6
8

https://gist.github.com/dvdotsenko/d8e0068775ac04b58993f604f122284f

асинхронный map и filter реализация для Python 3.6+, специально разработанная для возврата подзадач в неупорядоченном порядке, в зависимости от того, что будет выполнено раньше.

from collections import deque
from typing import Any, Callable, Collection, AsyncIterator, Iterator, Union


async def _next(gg):
    # repackaging non-asyncio next() as async-like anext()
    try:
        return next(gg)
    except StopIteration:
        raise StopAsyncIteration


async def _aionext(gg):
    # there is no anext() :(
    return await gg.__anext__()


async def map_unordered(fn:Callable, args:Union[Iterator,Collection,AsyncIterator], maxsize=None):
    """
    Async generator yielding return values of resolved invocations
    of `fn` against arg in args list

    Arguments are consumed and fed to callable in the order they are presented in args.
    Results are yielded NOT in order of args. Earliest done is yielded.

    If `size` is specified, worker tasks pool is constrained to that size.

    This is asyncio equivalent of Gevent's `imap_unordered(fn, args_iterable, pool_size)`
    http://www.gevent.org/api/gevent.pool.html#gevent.pool.Group.imap_unordered

    `args` may be Async Iterator or regular Iterator. 
     Thus, you can chain `map_unordered` as `args` for another `map_unordered`

    Because this is an async generator, cannot consume it as regular iterable.
    Must use `async for`.

    Usage example:

            # note NO await in this assignment
            gen = map_unordered(fn, arguments_iter, maxsize=3)
            async for returned_value in gen:
                yield returned_value

    """
    if maxsize == 0:
        raise ValueError(
            'Argument `maxsize` cannot be set to zero. '
            'Use `None` to indicate no limit.'
        )

    # Make args list consumable like a generator
    # so repeated islice(args, size) calls against `args` move slice down the list.

    if hasattr(args, '__anext__'):
        n = _aionext
    elif hasattr(args, '__next__'):
        n = _next
    else:
        args = iter(args)
        n = _next

    have_args = True  # assumed. Don't len(args).
    pending_tasks = deque()

    while have_args or len(pending_tasks):
        try:
            while len(pending_tasks) != maxsize:
                arg = await n(args)
                pending_tasks.append(
                    asyncio.Task(fn(arg))
                )
        except StopAsyncIteration:
            have_args = False

        if not len(pending_tasks):
            return

        done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
        pending_tasks = deque(pending_tasks)

        for task in done:
            yield await task  # await converts task object into its return value


async def _filter_wrapper(fn, arg):
    return (await fn(arg)), arg

async def _filter_none(arg):
    return not (arg is None)

async def filter_unordered(fn:Union[Callable,None], args:Union[Iterator,Collection,AsyncIterator], maxsize=None):
    """
    Async filter generator yielding values of `args` collection that match filter condition.
    Like python's native `filter([Callable|None], iterable)` but:
    - allows iterable to be async iterator
    - allows callable to be async callable
    - returns results OUT OF ORDER - whichever passes filter test first.

    Arguments are consumed and fed to callable in the order they are presented in args.
    Results are yielded NOT in order of args. Earliest done and passing the filter condition is yielded.

    If `maxsize` is specified, worker tasks pool is constrained to that size.

    This is inspired by Gevent's `imap_unordered(fn, args_iterable, pool_size)`
    http://www.gevent.org/api/gevent.pool.html#gevent.pool.Group.imap_unordered

    Because this is an async generator, cannot consume it as regular iterable.
    Must use `async for`.

    Usage example:

            # note NO await in this assignment
            gen = filter_unordered(fn, arguments_iter, maxsize=3)
            async for returned_value in gen:
                yield returned_value

    """
    if maxsize == 0:
        raise ValueError(
            'Argument `maxsize` cannot be set to zero. '
            'Use `None` to indicate no limit.'
        )

    if hasattr(args, '__anext__'):
        n = _aionext
    elif hasattr(args, '__next__'):
        n = _next
    else:
        args = iter(args)
        n = _next

    if fn is None:
        fn = _filter_none

    have_args = True  # assumed. Don't len(args).
    pending_tasks = deque()

    while have_args or len(pending_tasks):
        try:
            while len(pending_tasks) != maxsize:
                arg = await n(args)
                pending_tasks.append(
                    asyncio.Task(_filter_wrapper(fn,arg))
                )
        except StopAsyncIteration:
            have_args = False

        if not len(pending_tasks):
            return

        done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
        pending_tasks = deque(pending_tasks)

        for task in done:
            filter_match, arg = await task
            if filter_match:
                yield arg

Работает как Gevent imap_unorderedно, в отличие от версии Gevent, также позволяет итеративным аргументам быть генератором асинхронных значений. Означает, что их можно связать.

Дано:

async def worker(seconds):
    print('> Start wait', seconds)
    await asyncio.sleep(seconds)
    print('< End wait', seconds)
    return seconds


async def to_aio_gen(ll):
    for e in ll:
        yield e

async def test_map(ll, size=None):
    t = time.time()
    async for v in map_unordered(worker, ll, maxsize=size):
        print('-- elapsed second', round(time.time() - t, 1), ' received value', v)


ll = [
    0.2,
    0.4,
    0.8,
    1.2,
    1.1,
    0.3,
    0.6,
    0.9,
]

Результаты тестирования:

неасинхронный итерабельный, размер пула = 3

>>> asyncio.run(test_map(ll, 3))
> Start wait 0.2
> Start wait 0.4
> Start wait 0.8
< End wait 0.2
-- elapsed second 0.2  received value 0.2
> Start wait 1.2
< End wait 0.4
-- elapsed second 0.4  received value 0.4
> Start wait 1.1
< End wait 0.8
-- elapsed second 0.8  received value 0.8
> Start wait 0.3
< End wait 0.3
-- elapsed second 1.1  received value 0.3
> Start wait 0.6
< End wait 1.2
-- elapsed second 1.4  received value 1.2
> Start wait 0.9
< End wait 1.1
-- elapsed second 1.5  received value 1.1
< End wait 0.6
-- elapsed second 1.7  received value 0.6
< End wait 0.9
-- elapsed second 2.3  received value 0.9

Асинхронный итератор как список аргументов, размер пула = 3, фильтр

async def more_than_half(v):
    await asyncio.sleep(v)
    return v > 0.5

>>> asyncio.run(filter_unordered(more_than_half, aio_gen(ll), 3))
-- elapsed second 0.8  received value 0.8
-- elapsed second 1.4  received value 1.2
-- elapsed second 1.5  received value 1.1
-- elapsed second 1.7  received value 0.6
-- elapsed second 2.3  received value 0.9
Другие вопросы по тегам