Как правильно создавать и запускать параллельные задачи с помощью модуля Python asyncio?

Я пытаюсь правильно понять и реализовать два одновременно работающих Task объекты, использующие Python 3 относительно новый asyncio модуль.

Короче говоря, Asyncio, кажется, предназначен для обработки асинхронных процессов и одновременных Task выполнение по циклу событий. Это способствует использованию await (применяется в асинхронных функциях) как способ без обратного вызова для ожидания и использования результата, не блокируя цикл обработки событий. (Фьючерсы и обратные вызовы по-прежнему являются жизнеспособной альтернативой.)

Это также обеспечивает asyncio.Task() класс, специализированный подкласс Future предназначен для упаковки сопрограмм. Предпочтительно вызывается с помощью asyncio.ensure_future() метод. Предполагаемое использование асинхронных задач - позволить независимо выполняющимся задачам запускаться "одновременно" с другими задачами в рамках одного и того же цикла событий. Я понимаю, что Tasks подключены к циклу событий, который затем автоматически продолжает движение сопрограммы между await заявления.

Мне нравится идея возможности одновременного использования Задач без необходимости использования одного из Executor классы, но я не нашел много разработок по реализации.

Вот как я сейчас это делаю:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

В случае одновременного запуска двух циклических задач я заметил, что, если у задачи нет внутреннего await выражение застрянет в while цикл, эффективно блокирующий выполнение других задач (очень похоже на обычный while петля). Однако, как только Задачам приходится (а) ждать, они, похоже, запускаются одновременно без проблем.

Таким образом await операторы, кажется, обеспечивают цикл событий опорой для переключения между задачами, создавая эффект параллелизма.

Пример вывода с внутренней await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

Пример вывода без внутреннего await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

Вопросы

Проходит ли эта реализация "правильный" пример параллельных циклических задач в asyncio?

Верно ли, что это работает только для Task обеспечить точку блокировки (await выражение) для того, чтобы цикл событий манипулировал несколькими задачами?

3 ответа

Решение

Да, любая сопрограмма, которая выполняется внутри вашего цикла событий, будет блокировать выполнение других сопрограмм и задач, если только

  1. Вызывает другую сопрограмму, используя yield from или же await (если используется Python 3.5+).
  2. Возвращает.

Это потому что asyncio однопоточный; единственный способ запустить цикл обработки событий - это отсутствие других сопрограмм, которые будут активно выполняться. С помощью yield from / await временно приостанавливает сопрограмму, давая возможность циклу обработки событий работать.

Ваш пример кода в порядке, но во многих случаях вам, вероятно, не понадобится долго работающий код, который не выполняет асинхронный ввод-вывод внутри цикла обработки событий. В этих случаях часто имеет смысл использовать BaseEventLoop.run_in_executor запустить код в фоновом потоке или процессе. ProcessPoolExecutor будет лучшим выбором, если ваша задача связана с процессором, ThreadPoolExecutor будет использоваться, если вам нужно сделать некоторые I/O, которые не asyncio -дружелюбный.

Например, ваши два цикла полностью связаны с процессором и не имеют общего состояния, поэтому наилучшая производительность достигается при использовании ProcessPoolExecutor чтобы запустить каждый цикл параллельно между процессорами:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
    baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))

    loop.run_forever()

Функцииasyncio.ensure_futureиasyncio.get_event_loopустарели в Python 3.10.

Вы можете запустить две сопрограммыsay_booиsay_baaодновременно черезasyncio.create_task:

      async def main():
    boo = asyncio.create_task(say_boo())
    baa = asyncio.create_task(say_baa())
    await boo
    await baa

asyncio.run(main())

Вы также можете использоватьasyncio.gather

      async def main():
    await asyncio.gather(say_boo(), say_baa())

asyncio.run(main())

Вам не обязательно нужен yield from x передать контроль над циклом событий.

В вашем примере, я думаю, что правильным способом было бы сделать yield None или эквивалентно простой yield, а не yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

Сопрограммы - это просто старые генераторы Python. Внутренне asyncio Цикл событий хранит записи об этих генераторах и вызовах gen.send() на каждом из них по одному в бесконечном цикле. Всякий раз, когда вам yieldПризыв к gen.send() завершается, и цикл может двигаться дальше. (Я упрощаю это; посмотрите на https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py фактический код)

Тем не менее, я все равно пойду run_in_executor маршрут, если вам нужно делать интенсивные вычисления процессора без обмена данными.

Другие вопросы по тегам