Как правильно создавать и запускать параллельные задачи с помощью модуля Python asyncio?
Я пытаюсь правильно понять и реализовать два одновременно работающих Task
объекты, использующие Python 3 относительно новый asyncio
модуль.
Короче говоря, Asyncio, кажется, предназначен для обработки асинхронных процессов и одновременных Task
выполнение по циклу событий. Это способствует использованию await
(применяется в асинхронных функциях) как способ без обратного вызова для ожидания и использования результата, не блокируя цикл обработки событий. (Фьючерсы и обратные вызовы по-прежнему являются жизнеспособной альтернативой.)
Это также обеспечивает asyncio.Task()
класс, специализированный подкласс Future
предназначен для упаковки сопрограмм. Предпочтительно вызывается с помощью asyncio.ensure_future()
метод. Предполагаемое использование асинхронных задач - позволить независимо выполняющимся задачам запускаться "одновременно" с другими задачами в рамках одного и того же цикла событий. Я понимаю, что Tasks
подключены к циклу событий, который затем автоматически продолжает движение сопрограммы между await
заявления.
Мне нравится идея возможности одновременного использования Задач без необходимости использования одного из Executor
классы, но я не нашел много разработок по реализации.
Вот как я сейчас это делаю:
import asyncio
print('running async test')
async def say_boo():
i = 0
while True:
await asyncio.sleep(0)
print('...boo {0}'.format(i))
i += 1
async def say_baa():
i = 0
while True:
await asyncio.sleep(0)
print('...baa {0}'.format(i))
i += 1
# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
В случае одновременного запуска двух циклических задач я заметил, что, если у задачи нет внутреннего await
выражение застрянет в while
цикл, эффективно блокирующий выполнение других задач (очень похоже на обычный while
петля). Однако, как только Задачам приходится (а) ждать, они, похоже, запускаются одновременно без проблем.
Таким образом await
операторы, кажется, обеспечивают цикл событий опорой для переключения между задачами, создавая эффект параллелизма.
Пример вывода с внутренней await
:
running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2
Пример вывода без внутреннего await
:
...boo 0
...boo 1
...boo 2
...boo 3
...boo 4
Вопросы
Проходит ли эта реализация "правильный" пример параллельных циклических задач в asyncio
?
Верно ли, что это работает только для Task
обеспечить точку блокировки (await
выражение) для того, чтобы цикл событий манипулировал несколькими задачами?
3 ответа
Да, любая сопрограмма, которая выполняется внутри вашего цикла событий, будет блокировать выполнение других сопрограмм и задач, если только
- Вызывает другую сопрограмму, используя
yield from
или жеawait
(если используется Python 3.5+). - Возвращает.
Это потому что asyncio
однопоточный; единственный способ запустить цикл обработки событий - это отсутствие других сопрограмм, которые будут активно выполняться. С помощью yield from
/ await
временно приостанавливает сопрограмму, давая возможность циклу обработки событий работать.
Ваш пример кода в порядке, но во многих случаях вам, вероятно, не понадобится долго работающий код, который не выполняет асинхронный ввод-вывод внутри цикла обработки событий. В этих случаях часто имеет смысл использовать BaseEventLoop.run_in_executor
запустить код в фоновом потоке или процессе. ProcessPoolExecutor
будет лучшим выбором, если ваша задача связана с процессором, ThreadPoolExecutor
будет использоваться, если вам нужно сделать некоторые I/O, которые не asyncio
-дружелюбный.
Например, ваши два цикла полностью связаны с процессором и не имеют общего состояния, поэтому наилучшая производительность достигается при использовании ProcessPoolExecutor
чтобы запустить каждый цикл параллельно между процессорами:
import asyncio
from concurrent.futures import ProcessPoolExecutor
print('running async test')
def say_boo():
i = 0
while True:
print('...boo {0}'.format(i))
i += 1
def say_baa():
i = 0
while True:
print('...baa {0}'.format(i))
i += 1
if __name__ == "__main__":
executor = ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))
loop.run_forever()
Функцииasyncio.ensure_future
иasyncio.get_event_loop
устарели в Python 3.10.
Вы можете запустить две сопрограммыsay_boo
иsay_baa
одновременно черезasyncio.create_task
:
async def main():
boo = asyncio.create_task(say_boo())
baa = asyncio.create_task(say_baa())
await boo
await baa
asyncio.run(main())
Вы также можете использоватьasyncio.gather
async def main():
await asyncio.gather(say_boo(), say_baa())
asyncio.run(main())
Вам не обязательно нужен yield from x
передать контроль над циклом событий.
В вашем примере, я думаю, что правильным способом было бы сделать yield None
или эквивалентно простой yield
, а не yield from asyncio.sleep(0.001)
:
import asyncio
@asyncio.coroutine
def say_boo():
i = 0
while True:
yield None
print("...boo {0}".format(i))
i += 1
@asyncio.coroutine
def say_baa():
i = 0
while True:
yield
print("...baa {0}".format(i))
i += 1
boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
Сопрограммы - это просто старые генераторы Python. Внутренне asyncio
Цикл событий хранит записи об этих генераторах и вызовах gen.send()
на каждом из них по одному в бесконечном цикле. Всякий раз, когда вам yield
Призыв к gen.send()
завершается, и цикл может двигаться дальше. (Я упрощаю это; посмотрите на https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py фактический код)
Тем не менее, я все равно пойду run_in_executor
маршрут, если вам нужно делать интенсивные вычисления процессора без обмена данными.