Как я могу периодически выполнять функцию с asyncio?
Я мигрирую из tornado
в asyncio
и я не могу найти asyncio
эквивалент tornado
"s PeriodicCallback
, (A PeriodicCallback
принимает два аргумента: функцию для запуска и количество миллисекунд между вызовами.)
- Есть ли такой эквивалент в
asyncio
? - Если нет, то каков будет самый чистый способ реализовать это, не рискуя получить
RecursionError
спустя некоторое время?
9 ответов
Для версий Python ниже 3.5:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Для Python 3.5 и выше:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Когда вы чувствуете, что что-то должно произойти "на заднем плане" вашей программы asyncio, asyncio.Task
может быть хороший способ сделать это. Вы можете прочитать этот пост, чтобы увидеть, как работать с задачами.
Вот возможная реализация класса, который периодически выполняет некоторую функцию:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
Давайте проверим это:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Выход:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
Как вы видите на start
мы просто запускаем задачу, которая вызывает некоторые функции и некоторое время спит в бесконечном цикле. На stop
мы просто отменили эту задачу. Обратите внимание, что эта задача должна быть остановлена в момент завершения программы.
Еще одна важная вещь: ваш обратный вызов не должен занимать много времени (или он заморозит ваш цикл обработки событий). Если вы планируете назвать какой-то долгосрочный func
Вам, возможно, потребуется запустить его в executor.
Вариант, который может быть полезен: если вы хотите, чтобы ваш повторяющийся вызов происходил каждые n секунд, а не n секунд между концом последнего выполнения и началом следующего, и вы не хотите, чтобы вызовы перекрывались во времени, следующие проще:
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
И пример использования его для запуска пары задач в фоновом режиме:
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Нет встроенной поддержки периодических звонков, нет.
Просто создайте свой собственный цикл планировщика, который спит и выполняет любые запланированные задачи:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
await for task in scheduled_tasks(time.time()):
await task()
scheduled_tasks()
Итератор должен создавать задачи, которые готовы к запуску в данный момент. Обратите внимание, что составление расписания и запуск всех заданий теоретически могут занять более 1 секунды; Идея в том, что планировщик выдает все задачи, которые должны были начаться с момента последней проверки.
Альтернативная версия с декоратором для Python 3.7
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
Основано на @A. Ответ Jesse Jiryu Davis (с комментариями @Torkel Bjørnson-Langen и @ReWrite) - это улучшение, позволяющее избежать дрейфа.
import time
import asyncio
@asyncio.coroutine
def periodic(period):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * period - time.time(), 0)
g = g_tick()
while True:
print('periodic', time.time())
yield from asyncio.sleep(next(g))
loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Для нескольких типов планирования я бы рекомендовал APSScheduler с поддержкой asyncio.
Я использую его для простого процесса Python, который я могу запустить с помощью докера, и просто работает как cron, выполняя что-то еженедельно, пока я не убью докер/процесс.
В этом решении используется концепция оформления от user10649957, дрейфующий обходной путь от user2003487 и суперкласс для создания максимально элегантного кода для работы с асинхронными периодическими функциями.
Без резьбы.Thread
Решение состоит из следующих файлов:
- с базовым классом для вас в подкласс
- с примером подкласса
- с примером создания и запуска
Класс в файле:
import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
Пример простого подкласса в файле:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
Создание экземпляра и запуск примера класса в файле:
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
Этот код представляет собой элегантное решение, которое также смягчает проблему временного дрейфа других решений. Вывод аналогичен:
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
С резьбой.Thread
Решение состоит из следующих файлов:
- с классом асинхронного потока навеса.
- с базовым классом для вас в подкласс
- с примером подкласса
- с примером создания и запуска
The
AsyncThread
класс в файле
async_thread.py
:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
The
PeriodicAsyncThread
класс в файле
periodic_async_thread.py
:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
Пример простого подкласса
APeriodicThread
в файле
a_periodic_thread.py
:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
Создание экземпляра и запуск примера класса в файле
run_me.py
:
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
Этот код представляет собой элегантное решение, которое также смягчает проблему временного дрейфа других решений. Вывод аналогичен:
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
Это то, что я сделал, чтобы проверить свою теорию периодических обратных вызовов с использованием asyncio. У меня нет опыта использования Tornado, поэтому я не уверен, как именно с ним работают периодические обратные вызовы. Я привык использовать
after(ms, callback)
в Tkinter, и это то, что я придумал.
While True:
Просто мне кажется уродливым, даже если он асинхронный (в большей степени, чем глобальные). В
call_later(s, callback, *args)
метод использует секунды, а не миллисекунды.
import asyncio
my_var = 0
def update_forever(the_loop):
global my_var
print(my_var)
my_var += 1
# exit logic could be placed here
the_loop.call_later(3, update_forever, the_loop) # the method adds a delayed callback on completion
event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()