"Запусти и забудь" python async/await

Иногда необходимо выполнить некритическую асинхронную операцию, но я не хочу ждать, пока она завершится. В сопрограммной реализации Торнадо вы можете "запустить и забыть" асинхронную функцию, просто пропустив yield ключевое слово.

Я пытался понять, как "запустить и забыть" с новым async/await синтаксис выпущен в Python 3.5. Например, упрощенный фрагмент кода:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

Что происходит, хотя это то, что bar() никогда не выполняется, и вместо этого мы получаем предупреждение во время выполнения:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"

6 ответов

Решение

Upd:

замещать asyncio.ensure_future с asyncio.create_task везде, если вы используете Python >= 3.7 Это новый, более хороший способ порождать задачу.


asyncio.Task "уволить и забыть"

В соответствии с документами Python для asyncio.Task можно запустить некоторую сопрограмму для выполнения "в фоновом режиме". Задача, созданная asyncio.ensure_future функция не будет блокировать выполнение (поэтому функция вернется немедленно!). Это похоже на способ "выстрелить и забыть", как вы просили.

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Выход:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

Что если задачи выполняются после завершения цикла событий?

Обратите внимание, что asyncio ожидает, что задача будет выполнена в тот момент, когда завершится цикл событий. Так что если вы измените main() чтобы:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

Вы получите это предупреждение после завершения программы:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

Чтобы предотвратить это, вы можете просто дождаться выполнения всех ожидающих задач после завершения цикла событий:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

Убивайте задачи, а не ожидайте их

Иногда вы не хотите ждать выполнения задач (например, некоторые задачи могут быть созданы для вечного выполнения). В этом случае вы можете просто отменить () их, а не ждать их:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

Выход:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

Спасибо Сергей за лаконичный ответ. Вот украшенная версия того же.

import asyncio
import time


def fire_and_forget(f):
    from functools import wraps
    @wraps(f)
    def wrapped(*args, **kwargs):
        loop = asyncio.get_event_loop()
        if callable(f):
            return loop.run_in_executor(None, f, *args, **kwargs)
        else:
            raise TypeError('Task must be a callable')    
    return wrapped


@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")


print("Hello")
foo()
print("I didn't wait for foo()")

Производит

>>> Hello
>>> I didn't wait for foo()
>>> foo() completed

Это не совсем асинхронное выполнение, но, возможно, вам подходит run_in_executor().

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)

По какой-то причине, если вы не можете использовать asyncioто вот реализация с использованием простых потоков. Проверьте мои другие ответы и ответ Сергея.

import threading

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

Существует неуказанная проблема завершения, поскольку «выстрелил и забыл» не говорит, когда действия должны быть завершены (и, в конечном итоге, нужно ли их зависать или уничтожать при завершении программы). Решение — использовать контекстный менеджер. В Python 3.11 это теперь называется TaskGroup.

Более ранняя альтернатива — пакет aiowire . Его контекстный менеджер имеет опцию тайм-аута и использует «батутный» дизайн, позволяющий асинхронным функциям возвращать асинхронные функции. Это позволяет избежать как фоновых потоков, так и бесконечных цепочек асинхронных вызовов.

      def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        threading.Thread(target=functools.partial(f, *args, **kwargs)).start()

    return wrapped

это лучшая версия вышеперечисленного - не использует asyncio

Другие вопросы по тегам