При использовании asyncio, как вы позволите завершить все запущенные задачи перед завершением цикла событий

У меня есть следующий код:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

Я запускаю эту функцию до завершения. Проблема возникает, когда установлено выключение - функция завершается, и все отложенные задачи никогда не выполняются. (Вы видите это как ошибку

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

). Как правильно запланировать выключение?

Чтобы дать некоторый контекст, я пишу системный монитор, который читает из /proc/stat каждые 5 секунд, вычисляет загрузку процессора за этот период и затем отправляет результат на сервер. Я хочу продолжать планировать эти задания мониторинга до тех пор, пока не получу sigterm, когда я перестану планировать, дождусь завершения всех текущих заданий и выйду изящно.

9 ответов

Решение

Вы можете получить незаконченные задачи и снова запустить цикл, пока они не закончатся, затем закрыть цикл или выйти из программы.

pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
  • pending - это список ожидающих выполнения задач.
  • asyncio.gather () позволяет ожидать нескольких задач одновременно.

Если вы хотите убедиться, что все задачи выполнены внутри сопрограммы (возможно, у вас есть "главная" сопрограмма), вы можете сделать это следующим образом, например:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*asyncio.Task.all_tasks())

Кроме того, в этом случае, поскольку все задачи создаются в одной сопрограмме, у вас уже есть доступ к задачам:

@asyncio.coroutine
def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.async(my_expensive_operation()))
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*tasks)

Я заметил некоторые ответы, предложенные с использованием asyncio.gather(*asyncio.all_tasks()), но проблема с этим иногда может заключаться в бесконечном цикле, в котором он ожидает asyncio.current_task()для завершения, что само по себе. В некоторых ответах предлагались некоторые сложные обходные пути, включая проверку coro имена или len(asyncio.all_tasks()), но оказывается, что это очень просто сделать, воспользовавшись set операции:

      async def main():
    # Create some tasks.
    for _ in range(10):
        asyncio.create_task(asyncio.sleep(10))
    # Wait for all other tasks to finish other than the current task i.e. main().
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})

Начиная с Python 3.7, в приведенном выше ответе используются несколько устаревших API (asyncio.async и Task.all_tasks,@asyncio.coroutine, yield from и т. Д.), И вам лучше использовать это:

import asyncio

my_interval = 1

shutdown_flag_is_set = False


async def my_expensive_operation():
    print(await asyncio.sleep(10, result="Expensive operation finished."))


async def do_something_periodically(*, loop):
    global shutdown_flag_is_set
    while True:
        loop.create_task(my_expensive_operation())
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await  asyncio.gather(*asyncio.all_tasks(loop))


loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(do_something_periodically(loop=loop))
except KeyboardInterrupt:
    shutdown_flag_is_set = True

Используйте сопрограмму-оболочку, которая ждет, пока счетчик ожидающих задач не станет 1, прежде чем вернуться.

async def loop_job():
    asyncio.create_task(do_something_periodically())
    while len(asyncio.Task.all_tasks()) > 1:  # Any task besides loop_job() itself?
        await asyncio.sleep(0.2)

asyncio.run(loop_job())

Я не уверен, что это то, о чем вы просили, но у меня была аналогичная проблема, и вот окончательное решение, которое я придумал.

Код совместим с Python 3 и использует только общедоступные API-интерфейсы asyncio (что означает отсутствие взлома _coro и никаких устаревших API).

import asyncio

async def fn():
  await asyncio.sleep(1.5)
  print('fn')

async def main():
    print('main start')
    asyncio.create_task(fn()) # run in parallel
    await asyncio.sleep(0.2)
    print('main end')


def async_run_and_await_all_tasks(main):
  def get_pending_tasks():
      tasks = asyncio.Task.all_tasks()
      pending = [task for task in tasks if task != run_main_task and not task.done()]
      return pending

  async def run_main():
      await main()

      while True:
          pending_tasks = get_pending_tasks()
          if len(pending_tasks) == 0: return
          await asyncio.gather(*pending_tasks)

  loop = asyncio.new_event_loop()
  run_main_coro = run_main()
  run_main_task = loop.create_task(run_main_coro)
  loop.run_until_complete(run_main_task)

# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)

вывод (как и ожидалось):

main start
main end
fn

Эта функция async_run_and_await_all_tasks заставит python вести себя в стиле nodejs: выходить только тогда, когда нет незавершенных задач.

Вы также можете рассмотреть возможность использования asyncio.shield, хотя, выполнив этот способ, вы не получите ВСЕ выполняющиеся задачи, а только экранированные. Но это все еще может быть полезно в некоторых сценариях.

Кроме того, начиная с Python 3.7 мы также можем использовать высокоуровневый метод API asynio.run здесь. Как разработчик ядра Python, Юрий Селиванов предлагает: https://youtu.be/ReXxO_azV-w?t=636
Примечание: функция asyncio.run была добавлена ​​в asyncio в Python 3.7 на временной основе.

Надеюсь, это поможет!

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        # using asyncio.shield
        await asyncio.shield(asyncio.sleep(interval))


coro = do_something_periodically(1, 1)

if __name__ == "__main__":
    try:
        # using asyncio.run
        asyncio.run(coro)
    except KeyboardInterrupt:
        print('Cancelled!')
      import asyncio

async def coroutine_to_run(timetosleepinseconds):
    print(await asyncio.sleep(timetosleepinseconds, result=f"I have finished in {timetosleepinseconds} seconds"))
    ## Do your stuff

async def main():
    tasks = [asyncio.create_task(coroutine_to_run(timetosleepinseconds=2)), asyncio.create_task(coroutine_to_run(timetosleepinseconds=3))]
    await asyncio.gather(*tasks)

asyncio.run(main())

Если вам нужен чистый способ ожидания всех запущенных задач, созданных в некоторой локальной области, без утечки памяти (и при предотвращении ошибок сборки мусора ), вы можете поддерживать набор запущенных задач и использовать task.add_done_callback(...)удалить задачу из набора. Вот класс, который сделает это за вас:

      class TaskSet:
    def __init__(self):
        self.tasks = set()

    def add(self, coroutine: Coroutine) -> Task:
        task = asyncio.create_task(coroutine)
        self.tasks.add(task)
        task.add_done_callback(lambda _: self.tasks.remove(task))
        return task

    def __await__(self):
        return asyncio.gather(*self.tasks).__await__()

Что можно использовать так:

      async def my_function():
    await asyncio.sleep(0.5)


async def go():
    tasks = TaskSet()
    for i in range(10):
        tasks.add(my_function())
    await tasks

В моем варианте использования есть несколько основных задач, которые порождают краткосрочные задачи. Этот ответ сразу же завершается, когда вы видите, что основные задачи завершены (а также некоторые временные задачи), однако я хотел привести в порядок другие задачи. Задержка по времени не сработала бы (так как могут быть созданы дополнительные задачи) так активно используя.cancel()казался правильным выбором.

Код:

      import asyncio

MAX_TASKS = 10
task_maker_count = 0

async def task_maker():
    global task_maker_count
    task_maker_count += 1
    if len(asyncio.all_tasks()) < MAX_TASKS:
        asyncio.create_task(task_maker())
        asyncio.create_task(task_maker())

async def main_task():
    asyncio.create_task(task_maker())
    await asyncio.sleep(2.0)

async def main():
    global task_maker_count
    asyncio.create_task(main_task())
    asyncio.create_task(main_task())

Тест

          await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
    for task in [*asyncio.all_tasks() - {asyncio.current_task()}]:
        task.cancel()
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()},
                         return_exceptions=True)  # needed for CancelledError
          print(f'{task_maker_count} task_maker tasks created')

if __name__ == '__main__':
    asyncio.run(main())

Результат на моем компьютере:

      194672 task_maker tasks created

Не имеет особого значения, однако натыкаетсяMAX_TASKSдо тысяч резко сокращает количество выполненных задач.

Другие вопросы по тегам