Несколько асинхронных контекстных менеджеров
Можно ли объединить асинхронные контекстные менеджеры в python? Что-то похожее asyncio.gather
, но может использоваться с контекстными менеджерами. Что-то вроде этого:
async def foo():
async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
await vm1.do_something()
await vm2.do_something()
Возможно ли это в настоящее время?
1 ответ
Нечто близкое к gather_cm
может быть достигнуто с AsyncExitStack
, введенный в Python 3.7:
async def foo():
async with AsyncExitStack() as stack:
vm1, vm2 = await asyncio.gather(
stack.enter_async_context(start_vm()),
stack.enter_async_context(start_vm()))
await vm1.do_something()
await vm2.do_something()
К несчастью, __aexit__
s будет по-прежнему выполняться последовательно. Это потому что AsyncExitStack
имитирует вложенные контекстные менеджеры, которые имеют четко определенный порядок и не могут перекрываться. Менеджер внешнего контекста __aexit__
предоставляется информация о том, вызвало ли внутреннее исключение. (Дескриптор базы данных __aexit__
может использовать это для отката транзакции в случае исключения и фиксации в противном случае. __aexit__
Параллельно с этим диспетчеры контекста будут перекрываться, а информация об исключениях будет недоступна или ненадежна. Так что хотя gather(...)
работает __aenter__
параллельно AsyncExitStack
записи, который пришел первым и запускает __aexit__
в обратном порядке.
С асинхронными контекстными менеджерами есть альтернатива gather_cm
будет иметь смысл. Можно отбросить семантику вложенности и создать менеджер совокупного контекста, который работал бы как "пул выхода", а не как стек. Выходной пул принимает несколько контекстных менеджеров, которые не зависят друг от друга, что позволяет их __aenter__
а также __aexit__
методы должны быть запущены параллельно.
Сложная задача - правильно обрабатывать исключения: если таковые имеются __aenter__
поднимает, исключение должно распространяться, чтобы предотвратить with
блокировать от запуска. Для обеспечения правильности бассейн должен гарантировать, что __aexit__
будет вызываться на всех менеджерах контекста, чьи __aenter__
завершено.
Вот пример реализации:
import asyncio
import sys
class gather_cm:
def __init__(self, *cms):
self._cms = cms
async def __aenter__(self):
futs = [asyncio.create_task(cm.__aenter__())
for cm in self._cms]
await asyncio.wait(futs)
# only exit the cms we've successfully entered
self._cms = [cm for cm, fut in zip(self._cms, futs)
if not fut.cancelled() and not fut.exception()]
try:
return tuple(fut.result() for fut in futs)
except:
await self._exit(*sys.exc_info())
raise
async def _exit(self, *args):
# don't use gather() to ensure that we wait for all __aexit__s
# to complete even if one of them raises
done, _pending = await asyncio.wait(
[cm.__aexit__(*args)
for cm in self._cms if cm is not None])
return all(suppress.result() for suppress in done)
async def __aexit__(self, *args):
# Since exits are running in parallel, so they can't see each
# other exceptions. Send exception info from `async with`
# body to all.
return await self._exit(*args)
Эта тестовая программа показывает, как она работает:
class test_cm:
def __init__(self, x):
self.x = x
async def __aenter__(self):
print('__aenter__', self.x)
return self.x
async def __aexit__(self, *args):
print('__aexit__', self.x, args)
async def foo():
async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
print('cm1', cm1)
print('cm2', cm2)
asyncio.run(foo())