Каков наилучший способ повторно выполнять функцию каждые x секунд в Python?

Я хочу постоянно выполнять функцию в Python каждые 60 секунд навсегда (точно так же, как NSTimer в Objective C). Этот код будет работать как демон и по сути похож на вызов сценария python каждую минуту с использованием cron, но без необходимости его настройки пользователем.

В этом вопросе о cron, реализованном в Python, решение, по-видимому, эффективно просто sleep() на x секунд. Мне не нужны такие расширенные функциональные возможности, так что, возможно, что-то подобное будет работать

while True:
    # Code executed here
    time.sleep(60)

Есть ли предсказуемые проблемы с этим кодом?

23 ответа

Решение

Используйте модуль sched, который реализует планировщик событий общего назначения.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print "Doing stuff..."
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Просто зафиксируйте вашу петлю времени на системные часы. Легко.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))

Если вы хотите, чтобы неблокирующий способ выполнял вашу функцию периодически, вместо блокирующего бесконечного цикла, я бы использовал потоковый таймер. Таким образом, ваш код может продолжать работать и выполнять другие задачи, при этом ваша функция вызывается каждые n секунд. Я часто использую эту технику для распечатки информации о проделанной работе в длинных задачах с ЦП, дисками и сетью.

Вот код, который я разместил в похожем вопросе с элементами start() и stop():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Особенности:

  • Только стандартная библиотека, без внешних зависимостей
  • start() а также stop() безопасно звонить несколько раз, даже если таймер уже запущен / остановлен
  • вызываемая функция может иметь позиционные и именованные аргументы
  • Ты можешь измениться interval в любое время, это будет эффективно после следующего запуска. То же самое для args, kwargs и даже function!

Возможно, вы захотите рассмотреть Twisted - сетевую библиотеку Python, которая реализует шаблон Reactor.

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Хотя "while True: sleep(60)", вероятно, будет работать, Twisted, вероятно, уже реализует многие функции, которые вам в конечном итоге понадобятся (демонизация, ведение журнала или обработка исключений, как указано в bobince), и, вероятно, будет более надежным решением.

Вот обновление кода от MestreLion, которое позволяет избежать дрейфа с течением времени:

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Если вы хотите сделать это без блокировки оставшегося кода, вы можете использовать его, чтобы запустить в своем собственном потоке:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Это решение сочетает в себе несколько функций, редко встречающихся в других решениях:

  • Обработка исключений: насколько это возможно на этом уровне, исключения обрабатываются должным образом, т.е. регистрируются в целях отладки без прерывания нашей программы.
  • Нет цепочки: общая цепочечная реализация (для планирования следующего события), которую вы найдете во многих ответах, является хрупкой в ​​том аспекте, что если что-то пойдет не так в механизме планирования (threading.Timer или что-то еще), это приведет к разрыву цепи. Никаких дальнейших казней не произойдет, даже если причина проблемы уже устранена. Простой цикл и ожидание с простым sleep() гораздо надежнее по сравнению.
  • Без дрейфа: мое решение точно отслеживает время, в которое оно должно работать. Нет смещения в зависимости от времени выполнения (как и во многих других решениях).
  • Пропуск: Мое решение будет пропускать задачи, если одно выполнение занимало слишком много времени (например, делайте X каждые пять секунд, но X занимало 6 секунд). Это стандартное поведение cron (и не без оснований). Многие другие решения затем просто выполняют задачу несколько раз подряд без какой-либо задержки. В большинстве случаев (например, задачи по очистке) это нежелательно. Если хотите, просто используйте next_time += delay вместо.

Более простой способ, которым я верю:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Таким образом, ваш код выполняется, затем он ждет 60 секунд, затем он выполняется снова, ждет, выполняет и т.д.... Не нужно усложнять вещи:D

В итоге я использовал модуль расписания. API хороший.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

Альтернативным гибким решением является Apscheduler.

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

Кроме того, apscheduler предоставляет так много планировщиков, как показано ниже.

  • BlockingScheduler: используйте, когда планировщик - единственное, что работает в вашем процессе

  • BackgroundScheduler: используйте, когда вы не используете ни одну из перечисленных ниже структур и хотите, чтобы планировщик работал в фоновом режиме внутри вашего приложения.

  • AsyncIOScheduler: используйте, если ваше приложение использует модуль asyncio

  • GeventScheduler: используйте, если ваше приложение использует gevent

  • TornadoScheduler: используйте, если вы создаете приложение Tornado

  • TwistedScheduler: используйте, если вы создаете приложение Twisted

  • QtScheduler: используйте, если вы создаете приложение Qt

Я столкнулся с подобной проблемой некоторое время назад. Может быть, http://cronus.readthedocs.org/ может помочь?

Для v0.2 работает следующий фрагмент

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

Основное различие между этим и cron заключается в том, что исключение убьет демона навсегда. Вы могли бы хотеть обернуть с ловушкой исключения и регистратором.

Это кажется намного проще, чем принятое решение - есть ли у него недостатки, которые я не рассматриваю? Пришел сюда в поисках простой копии макаронных изделий.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Который асинхронно выводит.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

В этом есть дрейф. Если выполнение задачи занимает значительное количество времени, то интервал становится 2 секунды + время задачи, поэтому, если вам нужно точное планирование, это не для вас.**

Обратите внимание daemon=TrueФлаг означает, что этот поток не будет блокировать завершение работы приложения. Например, возникла проблема, когдаpytest зависает на неопределенное время после запуска тестов, ожидая прекращения этого действия.

Просто используйте

      import time

while True:
    print("this will run after every 30 sec")
    #Your code here
    time.sleep(30)

Один из возможных ответов:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

Вот адаптированная версия к коду от MestreLion. В дополнение к исходной функции этот код:

1) добавить first_interval, используемый для запуска таймера в определенное время (вызывающему необходимо вычислить first_interval и передать)

2) решить условие гонки в оригинальном коде. В исходном коде, если потоку управления не удалось отменить запущенный таймер ("Остановите таймер и отмените выполнение действия таймера. Это будет работать только в том случае, если таймер все еще находится в стадии ожидания". Цитируется по https://docs.python.org/2/library/threading.html), таймер будет работать бесконечно.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

Я использую метод Tkinter after(), который не "крадет игру" (как модуль sched, который был представлен ранее), т.е. он позволяет другим вещам работать параллельно:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1() а также do_something2() может работать параллельно и с любым интервалом скорости. Здесь 2-й будет выполнен вдвое быстрее. Обратите внимание, что я использовал простой счетчик в качестве условия для завершения любой функции. Вы можете использовать любое другое условие, которое вам нравится, или нет, если вы хотите выполнить функцию до завершения программы (например, часы).

Вот еще одно решение без использования дополнительных библиотек.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()

Я наткнулся на это, пытаясь писать скрипты для контроллеров FL Studio. Я даже не программирую на Python(программист C++). Но основные ответы неясны, поэтому я добавлю это.

Использование шаблона программирования разработчика игр для временных рамок. Обычно используется для блокировки выполнения частей цикла обновлений игры через определенный интервал времени. Например, чтобы обновление физического движка было независимым от рендеринга.

Основная идея заключается в следующем:

  1. Вычислите разницу между текущим временем и последним из этого «кадра» (или вызовите свою функцию).
  2. Затем накапливайте разницу во времени между кадрами.
  3. Если накопленная разница во времени превышает целевой временной интервал, мы выполняем код.
  4. Вы также можете при желании ответить и узнать, задерживается ли ваша функция.
      CurrentTime = time.monotonic()
AccumDiff = 0

def Update():
    global CurrentTime
    global AccumDiff

    LastTime = CurrentTime            # Store previous frame time
    CurrentTime = time.monotonic()    # Get the current frame time
    TimeDiff = CurrentTime - LastTime # Calculate the difference
    AccumDiff += TimeDiff             # Accumulate the difference in time

    # Do something every second
    if AccumDiff >= 1.0: # A second passed!
        Overtime = AccumDiff - 1.0 # (optional) Respond to overtime
        print("This print is delayed by:", Overtime ,"Seconds!")  
        
        # Reset our accumulated time
        AccumDiff = 0
        # Do something every X seconds
        print("Printing............")
    else: # A second has not passed yet.
        # Continue accumulating time (pass or do something in response)
        print("Not gonna print yet.")
    

Например, Показать текущее местное время

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()

Я использую это, чтобы вызвать 60 событий в час с большинством событий, происходящих в то же самое количество секунд после целой минуты:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

В зависимости от реальных условий вы можете получить отметки длины:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

но через 60 минут у вас будет 60 тиков; и большинство из них будет происходить с правильным смещением до минуты, которую вы предпочитаете.

В моей системе я получаю типичный дрейф < 1/20 секунды, пока не возникнет необходимость в коррекции.

Преимущество этого метода - разрешение смещения часов; что может вызвать проблемы, если вы делаете такие вещи, как добавление одного элемента за тик, и ожидаете, что 60 элементов добавляются в час. Отказ от учета дрейфа может привести к тому, что вторичные индикаторы, такие как скользящие средние, будут рассматривать данные слишком глубоко в прошлом, что приведет к неправильному выводу.

timed-count может делать это с высокой точностью (т. е. < 1 мс), поскольку он синхронизирован с системными часами. Он не будет дрейфовать со временем и не зависит от продолжительности времени выполнения кода (при условии, что оно меньше периода интервала).

Простой блокирующий пример:

      from timed_count import timed_count

for count in timed_count(60):
    # Execute code here exactly every 60 seconds
    ...

Вы можете легко сделать его неблокирующим , запустив его в потоке:

      from threading import Thread
from timed_count import timed_count

def periodic():
    for count in timed_count(60):
        # Execute code here exactly every 60 seconds
        ...

thread = Thread(target=periodic)
thread.start()

Я думаю, это зависит от того, что вы хотите сделать, и в вашем вопросе не было указано много деталей.

Я хочу проделать дорогостоящую операцию в одном из моих уже многопоточных процессов. Итак, у меня есть этот лидерский процесс, проверяющий время, и только она выполняет дорогостоящую операцию (контрольную точку в модели глубокого обучения). Для этого я увеличиваю счетчик, чтобы убедиться, что прошло 5, затем 10, а затем 15 секунд для сохранения каждые 5 секунд (или используйте модульную арифметику с math.floor):

      def print_every_5_seconds_have_passed_exit_eventually():
    """
    https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
    https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
    :return:
    """
    opts = argparse.Namespace(start=time.time())
    next_time_to_print = 0
    while True:
        current_time_passed = time.time() - opts.start
        if current_time_passed >= next_time_to_print:
            next_time_to_print += 5
            print(f'worked and {current_time_passed=}')
            print(f'{current_time_passed % 5=}')
            print(f'{math.floor(current_time_passed % 5) == 0}')
      starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

Для меня проверка оператора if - это то, что мне нужно. Наличие потоков и планировщиков в моем уже сложном многопроцессорном коде с несколькими графическими процессорами - это не сложность, которую я хочу добавить, если я могу этого избежать, а, кажется, могу. Проверяя идентификатор рабочего, легко убедиться, что это делает только 1 процесс.

Обратите внимание: я использовал операторы True print, чтобы действительно убедиться, что модульный арифметический трюк работает, поскольку проверка точного времени явно не сработает! Но, к моему приятному удивлению, пол сделал свое дело.

Другие вопросы по тегам