Несколько асинхронных контекстных менеджеров

Можно ли объединить асинхронные контекстные менеджеры в python? Что-то похожее asyncio.gather, но может использоваться с контекстными менеджерами. Что-то вроде этого:

async def foo():
    async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
        await vm1.do_something()
        await vm2.do_something()

Возможно ли это в настоящее время?

1 ответ

Решение

Нечто близкое к gather_cm может быть достигнуто с AsyncExitStack, введенный в Python 3.7:

async def foo():
    async with AsyncExitStack() as stack:
        vm1, vm2 = await asyncio.gather(
            stack.enter_async_context(start_vm()),
            stack.enter_async_context(start_vm()))
        await vm1.do_something()
        await vm2.do_something()

К несчастью, __aexit__ s будет по-прежнему выполняться последовательно. Это потому что AsyncExitStack имитирует вложенные контекстные менеджеры, которые имеют четко определенный порядок и не могут перекрываться. Менеджер внешнего контекста __aexit__ предоставляется информация о том, вызвало ли внутреннее исключение. (Дескриптор базы данных __aexit__ может использовать это для отката транзакции в случае исключения и фиксации в противном случае. __aexit__ Параллельно с этим диспетчеры контекста будут перекрываться, а информация об исключениях будет недоступна или ненадежна. Так что хотя gather(...) работает __aenter__ параллельно AsyncExitStack записи, который пришел первым и запускает __aexit__ в обратном порядке.

С асинхронными контекстными менеджерами есть альтернатива gather_cm будет иметь смысл. Можно отбросить семантику вложенности и создать менеджер совокупного контекста, который работал бы как "пул выхода", а не как стек. Выходной пул принимает несколько контекстных менеджеров, которые не зависят друг от друга, что позволяет их __aenter__ а также __aexit__ методы должны быть запущены параллельно.

Сложная задача - правильно обрабатывать исключения: если таковые имеются __aenter__ поднимает, исключение должно распространяться, чтобы предотвратить with блокировать от запуска. Для обеспечения правильности бассейн должен гарантировать, что __aexit__ будет вызываться на всех менеджерах контекста, чьи __aenter__ завершено.

Вот пример реализации:

import asyncio
import sys

class gather_cm:
    def __init__(self, *cms):
        self._cms = cms

    async def __aenter__(self):
        futs = [asyncio.create_task(cm.__aenter__())
                for cm in self._cms]
        await asyncio.wait(futs)
        # only exit the cms we've successfully entered
        self._cms = [cm for cm, fut in zip(self._cms, futs)
                     if not fut.cancelled() and not fut.exception()]
        try:
            return tuple(fut.result() for fut in futs)
        except:
            await self._exit(*sys.exc_info())
            raise

    async def _exit(self, *args):
        # don't use gather() to ensure that we wait for all __aexit__s
        # to complete even if one of them raises
        done, _pending = await asyncio.wait(
            [cm.__aexit__(*args)
             for cm in self._cms if cm is not None])
        return all(suppress.result() for suppress in done)

    async def __aexit__(self, *args):
        # Since exits are running in parallel, so they can't see each
        # other exceptions.  Send exception info from `async with`
        # body to all.
        return await self._exit(*args)

Эта тестовая программа показывает, как она работает:

class test_cm:
    def __init__(self, x):
        self.x = x
    async def __aenter__(self):
        print('__aenter__', self.x)
        return self.x
    async def __aexit__(self, *args):
        print('__aexit__', self.x, args)

async def foo():
    async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
        print('cm1', cm1)
        print('cm2', cm2)

asyncio.run(foo())
Другие вопросы по тегам