Как я могу периодически выполнять функцию с asyncio?

Я мигрирую из tornado в asyncioи я не могу найти asyncio эквивалент tornado"s PeriodicCallback, (A PeriodicCallback принимает два аргумента: функцию для запуска и количество миллисекунд между вызовами.)

  • Есть ли такой эквивалент в asyncio?
  • Если нет, то каков будет самый чистый способ реализовать это, не рискуя получить RecursionError спустя некоторое время?

9 ответов

Решение

Для версий Python ниже 3.5:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Для Python 3.5 и выше:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Когда вы чувствуете, что что-то должно произойти "на заднем плане" вашей программы asyncio, asyncio.Task может быть хороший способ сделать это. Вы можете прочитать этот пост, чтобы увидеть, как работать с задачами.

Вот возможная реализация класса, который периодически выполняет некоторую функцию:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Давайте проверим это:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Выход:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

Как вы видите на start мы просто запускаем задачу, которая вызывает некоторые функции и некоторое время спит в бесконечном цикле. На stop мы просто отменили эту задачу. Обратите внимание, что эта задача должна быть остановлена ​​в момент завершения программы.

Еще одна важная вещь: ваш обратный вызов не должен занимать много времени (или он заморозит ваш цикл обработки событий). Если вы планируете назвать какой-то долгосрочный funcВам, возможно, потребуется запустить его в executor.

Вариант, который может быть полезен: если вы хотите, чтобы ваш повторяющийся вызов происходил каждые n секунд, а не n секунд между концом последнего выполнения и началом следующего, и вы не хотите, чтобы вызовы перекрывались во времени, следующие проще:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

И пример использования его для запуска пары задач в фоновом режиме:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Нет встроенной поддержки периодических звонков, нет.

Просто создайте свой собственный цикл планировщика, который спит и выполняет любые запланированные задачи:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

scheduled_tasks() Итератор должен создавать задачи, которые готовы к запуску в данный момент. Обратите внимание, что составление расписания и запуск всех заданий теоретически могут занять более 1 секунды; Идея в том, что планировщик выдает все задачи, которые должны были начаться с момента последней проверки.

Альтернативная версия с декоратором для Python 3.7

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))

Основано на @A. Ответ Jesse Jiryu Davis (с комментариями @Torkel Bjørnson-Langen и @ReWrite) - это улучшение, позволяющее избежать дрейфа.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Для нескольких типов планирования я бы рекомендовал APSScheduler с поддержкой asyncio.

Я использую его для простого процесса Python, который я могу запустить с помощью докера, и просто работает как cron, выполняя что-то еженедельно, пока я не убью докер/процесс.

В этом решении используется концепция оформления от user10649957, дрейфующий обходной путь от user2003487 и суперкласс для создания максимально элегантного кода для работы с асинхронными периодическими функциями.

Без резьбы.Thread

Решение состоит из следующих файлов:

  • с базовым классом для вас в подкласс
  • с примером подкласса
  • с примером создания и запуска

Класс в файле:

      import time
import asyncio
import abc

class PeriodicAsyncThread:
    def __init__(self, period):
        self.period = period

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

    @abc.abstractmethod
    async def run(self, *args, **kwargs):
        return

    def start(self):
        asyncio.run(self.run())

Пример простого подкласса в файле:

      from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio

class APeriodicThread(PeriodicAsyncThread):
    def __init__(self, period):
        super().__init__(period)
        self.run = self.periodic()(self.run)
    
    async def run(self, *args, **kwargs):
        await asyncio.sleep(2)
        print(time.time())

Создание экземпляра и запуск примера класса в файле:

      from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()

Этот код представляет собой элегантное решение, которое также смягчает проблему временного дрейфа других решений. Вывод аналогичен:

      1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736

С резьбой.Thread

Решение состоит из следующих файлов:

  • с классом асинхронного потока навеса.
  • с базовым классом для вас в подкласс
  • с примером подкласса
  • с примером создания и запуска

The AsyncThreadкласс в файле async_thread.py:

      from threading import Thread
import asyncio
import abc

class AsyncThread(Thread):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

    @abc.abstractmethod
    async def async_run(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        # loop = asyncio.new_event_loop()
        # asyncio.set_event_loop(loop)

        # loop.run_until_complete(self.async_run(*args, **kwargs))
        # loop.close()
        asyncio.run(self.async_run(*args, **kwargs))

The PeriodicAsyncThreadкласс в файле periodic_async_thread.py:

      import time
import asyncio
from .async_thread import AsyncThread

class PeriodicAsyncThread(AsyncThread):
    def __init__(self, period, *args, **kwargs):
        self.period = period
        super().__init__(*args, **kwargs)
        self.async_run = self.periodic()(self.async_run)

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

Пример простого подкласса APeriodicThreadв файле a_periodic_thread.py:

      import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio

class APeriodicAsyncTHread(PeriodicAsyncThread):
    async def async_run(self, *args, **kwargs):
        print(f"{current_thread().name} {time.time()} Hi!")
        await asyncio.sleep(1)
        print(f"{current_thread().name} {time.time()} Bye!")

Создание экземпляра и запуск примера класса в файле run_me.py:

      from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()

Этот код представляет собой элегантное решение, которое также смягчает проблему временного дрейфа других решений. Вывод аналогичен:

      a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!

Это то, что я сделал, чтобы проверить свою теорию периодических обратных вызовов с использованием asyncio. У меня нет опыта использования Tornado, поэтому я не уверен, как именно с ним работают периодические обратные вызовы. Я привык использовать after(ms, callback) в Tkinter, и это то, что я придумал. While True:Просто мне кажется уродливым, даже если он асинхронный (в большей степени, чем глобальные). В call_later(s, callback, *args) метод использует секунды, а не миллисекунды.

      import asyncio
my_var = 0
def update_forever(the_loop):
    global my_var
    print(my_var)
    my_var += 1 
    # exit logic could be placed here
    the_loop.call_later(3, update_forever, the_loop)  # the method adds a delayed callback on completion

event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
Другие вопросы по тегам