При использовании asyncio, как вы позволите завершить все запущенные задачи перед завершением цикла событий
У меня есть следующий код:
@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
Я запускаю эту функцию до завершения. Проблема возникает, когда установлено выключение - функция завершается, и все отложенные задачи никогда не выполняются. (Вы видите это как ошибку
task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>
). Как правильно запланировать выключение?
Чтобы дать некоторый контекст, я пишу системный монитор, который читает из /proc/stat каждые 5 секунд, вычисляет загрузку процессора за этот период и затем отправляет результат на сервер. Я хочу продолжать планировать эти задания мониторинга до тех пор, пока не получу sigterm, когда я перестану планировать, дождусь завершения всех текущих заданий и выйду изящно.
9 ответов
Вы можете получить незаконченные задачи и снова запустить цикл, пока они не закончатся, затем закрыть цикл или выйти из программы.
pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
- pending - это список ожидающих выполнения задач.
- asyncio.gather () позволяет ожидать нескольких задач одновременно.
Если вы хотите убедиться, что все задачи выполнены внутри сопрограммы (возможно, у вас есть "главная" сопрограмма), вы можете сделать это следующим образом, например:
@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
yield from asyncio.gather(*asyncio.Task.all_tasks())
Кроме того, в этом случае, поскольку все задачи создаются в одной сопрограмме, у вас уже есть доступ к задачам:
@asyncio.coroutine
def do_something_periodically():
tasks = []
while True:
tasks.append(asyncio.async(my_expensive_operation()))
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
yield from asyncio.gather(*tasks)
Я заметил некоторые ответы, предложенные с использованием
asyncio.gather(*asyncio.all_tasks())
, но проблема с этим иногда может заключаться в бесконечном цикле, в котором он ожидает
asyncio.current_task()
для завершения, что само по себе. В некоторых ответах предлагались некоторые сложные обходные пути, включая проверку
coro
имена или
len(asyncio.all_tasks())
, но оказывается, что это очень просто сделать, воспользовавшись
set
операции:
async def main():
# Create some tasks.
for _ in range(10):
asyncio.create_task(asyncio.sleep(10))
# Wait for all other tasks to finish other than the current task i.e. main().
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
Начиная с Python 3.7, в приведенном выше ответе используются несколько устаревших API (asyncio.async и Task.all_tasks,@asyncio.coroutine, yield from и т. Д.), И вам лучше использовать это:
import asyncio
my_interval = 1
shutdown_flag_is_set = False
async def my_expensive_operation():
print(await asyncio.sleep(10, result="Expensive operation finished."))
async def do_something_periodically(*, loop):
global shutdown_flag_is_set
while True:
loop.create_task(my_expensive_operation())
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break
await asyncio.gather(*asyncio.all_tasks(loop))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(do_something_periodically(loop=loop))
except KeyboardInterrupt:
shutdown_flag_is_set = True
Используйте сопрограмму-оболочку, которая ждет, пока счетчик ожидающих задач не станет 1, прежде чем вернуться.
async def loop_job():
asyncio.create_task(do_something_periodically())
while len(asyncio.Task.all_tasks()) > 1: # Any task besides loop_job() itself?
await asyncio.sleep(0.2)
asyncio.run(loop_job())
Я не уверен, что это то, о чем вы просили, но у меня была аналогичная проблема, и вот окончательное решение, которое я придумал.
Код совместим с Python 3 и использует только общедоступные API-интерфейсы asyncio (что означает отсутствие взлома _coro
и никаких устаревших API).
import asyncio
async def fn():
await asyncio.sleep(1.5)
print('fn')
async def main():
print('main start')
asyncio.create_task(fn()) # run in parallel
await asyncio.sleep(0.2)
print('main end')
def async_run_and_await_all_tasks(main):
def get_pending_tasks():
tasks = asyncio.Task.all_tasks()
pending = [task for task in tasks if task != run_main_task and not task.done()]
return pending
async def run_main():
await main()
while True:
pending_tasks = get_pending_tasks()
if len(pending_tasks) == 0: return
await asyncio.gather(*pending_tasks)
loop = asyncio.new_event_loop()
run_main_coro = run_main()
run_main_task = loop.create_task(run_main_coro)
loop.run_until_complete(run_main_task)
# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)
вывод (как и ожидалось):
main start
main end
fn
Эта функция async_run_and_await_all_tasks заставит python вести себя в стиле nodejs: выходить только тогда, когда нет незавершенных задач.
Вы также можете рассмотреть возможность использования asyncio.shield, хотя, выполнив этот способ, вы не получите ВСЕ выполняющиеся задачи, а только экранированные. Но это все еще может быть полезно в некоторых сценариях.
Кроме того, начиная с Python 3.7 мы также можем использовать высокоуровневый метод API asynio.run здесь. Как разработчик ядра Python, Юрий Селиванов предлагает: https://youtu.be/ReXxO_azV-w?t=636
Примечание: функция asyncio.run была добавлена в asyncio в Python 3.7 на временной основе.
Надеюсь, это поможет!
import asyncio
async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))
async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
# using asyncio.shield
await asyncio.shield(asyncio.sleep(interval))
coro = do_something_periodically(1, 1)
if __name__ == "__main__":
try:
# using asyncio.run
asyncio.run(coro)
except KeyboardInterrupt:
print('Cancelled!')
import asyncio
async def coroutine_to_run(timetosleepinseconds):
print(await asyncio.sleep(timetosleepinseconds, result=f"I have finished in {timetosleepinseconds} seconds"))
## Do your stuff
async def main():
tasks = [asyncio.create_task(coroutine_to_run(timetosleepinseconds=2)), asyncio.create_task(coroutine_to_run(timetosleepinseconds=3))]
await asyncio.gather(*tasks)
asyncio.run(main())
Если вам нужен чистый способ ожидания всех запущенных задач, созданных в некоторой локальной области, без утечки памяти (и при предотвращении ошибок сборки мусора ), вы можете поддерживать набор запущенных задач и использовать
task.add_done_callback(...)
удалить задачу из набора. Вот класс, который сделает это за вас:
class TaskSet:
def __init__(self):
self.tasks = set()
def add(self, coroutine: Coroutine) -> Task:
task = asyncio.create_task(coroutine)
self.tasks.add(task)
task.add_done_callback(lambda _: self.tasks.remove(task))
return task
def __await__(self):
return asyncio.gather(*self.tasks).__await__()
Что можно использовать так:
async def my_function():
await asyncio.sleep(0.5)
async def go():
tasks = TaskSet()
for i in range(10):
tasks.add(my_function())
await tasks
В моем варианте использования есть несколько основных задач, которые порождают краткосрочные задачи. Этот ответ сразу же завершается, когда вы видите, что основные задачи завершены (а также некоторые временные задачи), однако я хотел привести в порядок другие задачи. Задержка по времени не сработала бы (так как могут быть созданы дополнительные задачи) так активно используя.cancel()
казался правильным выбором.
Код:
import asyncio
MAX_TASKS = 10
task_maker_count = 0
async def task_maker():
global task_maker_count
task_maker_count += 1
if len(asyncio.all_tasks()) < MAX_TASKS:
asyncio.create_task(task_maker())
asyncio.create_task(task_maker())
async def main_task():
asyncio.create_task(task_maker())
await asyncio.sleep(2.0)
async def main():
global task_maker_count
asyncio.create_task(main_task())
asyncio.create_task(main_task())
Тест
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
for task in [*asyncio.all_tasks() - {asyncio.current_task()}]:
task.cancel()
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()},
return_exceptions=True) # needed for CancelledError
print(f'{task_maker_count} task_maker tasks created')
if __name__ == '__main__':
asyncio.run(main())
Результат на моем компьютере:
194672 task_maker tasks created
Не имеет особого значения, однако натыкаетсяMAX_TASKS
до тысяч резко сокращает количество выполненных задач.