Python threading.timer - повторять функцию каждые 'n' секунд
У меня проблемы с таймером Python и буду очень признателен за советы или помощь:D
Я не слишком хорошо знаю, как работают потоки, но я просто хочу запускать функцию каждые 0,5 секунды и иметь возможность запускать, останавливать и сбрасывать таймер.
Тем не менее, я продолжаю получать RuntimeError: threads can only be started once
когда я выполню threading.timer.start()
дважды. Есть ли обходной путь для этого? Я пытался подать заявку threading.timer.cancel()
перед каждым стартом.
Псевдокод:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
16 ответов
Лучший способ - запустить поток таймера один раз. В вашей ветке таймера вы должны написать следующее
class MyThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
print("my thread")
# call a function
В коде, который запустил таймер, вы можете затем set
остановленное событие, чтобы остановить таймер.
stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
Немного улучшив Ганса Затем ответим, мы можем просто создать подкласс функции Timer. Следующее становится нашим полным кодом "таймера повтора", и его можно использовать в качестве замены для threading.Timer со всеми одинаковыми аргументами:
from threading import Timer
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
Пример использования:
def dummyfn(msg="foo"):
print(msg)
timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()
производит следующий вывод:
foo
foo
foo
foo
а также
timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()
производит
bar
bar
bar
bar
Из Эквивалента setInterval в python:
import threading
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
Использование:
@setInterval(.5)
def function():
"..."
stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()
Или вот та же функциональность, но в качестве отдельной функции вместо декоратора:
cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()
Используя таймеры
from threading import Timer,Thread,Event
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
print 'ipsem lorem'
t = perpetualTimer(5,printer)
t.start()
это может быть остановлено t.cancel()
В интересах предоставления правильного ответа, используя Timer в соответствии с запросом OP, я улучшу ответ swapnil jariwala:
from threading import Timer
import time
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick():
print('ipsem lorem')
# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
Время импорта является необязательным без использования примера.
Я изменил некоторый код в коде swapnil-jariwala, чтобы сделать немного консольных часов.
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
print("{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second))
t = perpetualTimer(1, printer)
t.start()
ВЫХОД
>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...
Таймер с графическим интерфейсом tkinter
Этот код помещает таймер часов в маленькое окно с tkinter
from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk
app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()
t = perpetualTimer(1, printer)
t.start()
app.mainloop()
Пример игры с карточками (вроде)
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
x = datetime.today()
start = x.second
def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0
def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0
questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]
answers = "Rome", "Paris", "London", "Madrid"
counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()
выход:
Get ready to answer
>>>
-What is the capital of Italy?
It is Rome
-What is the capital of France?
It is Paris
-What is the capital of England?
...
Я немного опоздал, но вот мои два цента:
Вы можете повторно использовать
threading.Timer
объект, вызвав его
.run()
метод несколько раз, например:
class SomeClassThatNeedsATimer:
def __init__(...):
self.timer = threading.Timer(interval, self.on_timer)
self.timer.start()
def on_timer(self):
print('On timer')
self.timer.run()
Я должен был сделать это для проекта. В итоге я запустил отдельный поток для функции
t = threading.Thread(target =heartbeat, args=(worker,))
t.start()
**** сердцебиение - моя функция, работник - один из моих аргументов ****
внутри моей функции сердцебиения:
def heartbeat(worker):
while True:
time.sleep(5)
#all of my code
Поэтому, когда я запускаю поток, функция будет несколько раз ждать 5 секунд, запускать весь мой код и делать это бесконечно. Если вы хотите убить процесс, просто убейте поток.
В дополнение к приведенным выше отличным ответам с использованием потоков, если вам нужно использовать свой основной поток или предпочитаете асинхронный подход - я обернул короткий класс вокруг класса таймера aio_timers (чтобы включить повторение)
import asyncio
from aio_timers import Timer
class RepeatingAsyncTimer():
def __init__(self, interval, cb, *args, **kwargs):
self.interval = interval
self.cb = cb
self.args = args
self.kwargs = kwargs
self.aio_timer = None
self.start_timer()
def start_timer(self):
self.aio_timer = Timer(delay=self.interval,
callback=self.cb_wrapper,
callback_args=self.args,
callback_kwargs=self.kwargs
)
def cb_wrapper(self, *args, **kwargs):
self.cb(*args, **kwargs)
self.start_timer()
from time import time
def cb(timer_name):
print(timer_name, time())
print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
часы начинаются с: 1602438840.9690785
timer_ 1 1602438845.980087
таймер_ 2 1602438850.9806316
таймер_ 1 1602438850.9808934
таймер_ 1 1602438855.9863033
таймер_ 2 1602438860.9868324
таймер_ 1 1602438860.9876585
Я придумал другое решение с классом SingleTon. Подскажите пожалуйста, есть ли здесь утечка памяти.
import time,threading
class Singleton:
__instance = None
sleepTime = 1
executeThread = False
def __init__(self):
if Singleton.__instance != None:
raise Exception("This class is a singleton!")
else:
Singleton.__instance = self
@staticmethod
def getInstance():
if Singleton.__instance == None:
Singleton()
return Singleton.__instance
def startThread(self):
self.executeThread = True
self.threadNew = threading.Thread(target=self.foo_target)
self.threadNew.start()
print('doing other things...')
def stopThread(self):
print("Killing Thread ")
self.executeThread = False
self.threadNew.join()
print(self.threadNew)
def foo(self):
print("Hello in " + str(self.sleepTime) + " seconds")
def foo_target(self):
while self.executeThread:
self.foo()
print(self.threadNew)
time.sleep(self.sleepTime)
if not self.executeThread:
break
sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()
sClass.getInstance().sleepTime = 2
sClass.startThread()
В интересах предоставления правильного ответа с использованием Timer по запросу OP я улучшу /questions/25587178/python-threadingtimer-povtoryat-funktsiyu-kazhdyie-n-sekund/25587209#25587209
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target, args=[], kwargs=dict()):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.args = args
self.kwargs = kwargs
self.thread = None
def _handle_target(self):
self.is_running = True
self.target(*self.args, **self.kwargs)
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(
self.seconds,
self._handle_target,
)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print(
"Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick(i):
print('ipsem lorem', i)
# Example Usage
t = InfiniteTimer(0.5, tick, kwargs=dict(i="i"))
t.start()
Мне нравится ответ right2clicky, особенно в том, что он не требует разрушения потока и создания нового каждый раз, когда срабатывает таймер. Кроме того, легко переопределить создание класса с обратным вызовом таймера, который периодически вызывается. Это мой обычный вариант использования:
class MyClass(RepeatTimer):
def __init__(self, period):
super().__init__(period, self.on_timer)
def on_timer(self):
print("Tick")
if __name__ == "__main__":
mc = MyClass(1)
mc.start()
time.sleep(5)
mc.cancel()
Я реализовал класс, который работает как таймер.
Я оставляю ссылку здесь на тот случай, если она кому-нибудь понадобится: https://github.com/ivanhalencp/python/tree/master/xTimer
from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()
TaskManager()
Вот небольшой пример, он поможет лучше понять, как он работает. Функция taskManager() в конце создает отложенный вызов функции самостоятельно.
Попробуйте изменить переменную "dalay", и вы сможете увидеть разницу
from threading import Timer, _sleep
# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True
def taskManager():
global counter, DATA, delay, allow_run
counter += 1
if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
else:
print("["+str(counter)+"] no data")
if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()
else:
print(" END task-manager: disabled")
# ------------------------------------------
def main():
DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)
# ------------------------------------------
print(" START task-manager:")
taskManager()
_sleep(2)
DATA.append("first data")
_sleep(2)
DATA.append("second data")
print(" START main():")
main()
print(" END main():")
_sleep(2)
DATA.append("last data")
allow_run = False
Это альтернативная реализация, использующая функцию вместо класса. Вдохновленный @Andrew Wilkins выше.
Поскольку ожидание более точно, чем сон (учитывается время выполнения функции):
import threading
PING_ON = threading.Event()
def ping():
while not PING_ON.wait(1):
print("my thread %s" % str(threading.current_thread().ident))
t = threading.Thread(target=ping)
t.start()
sleep(5)
PING_ON.set()
Это пример кода для непрерывной работы таймера. Просто создайте новый таймер при истощении и вызовите ту же функцию. Не лучший способ сделать это, но можно сделать и так.
import threading
import time
class ContinousTimer():
def __init__(self):
self.timer = None
def run(self, msg='abc'):
print(msg)
self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
self.timer.start()
if __name__ == "__main__":
t = ContinousTimer()
try:
t.run(msg="Hello")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
# Cancel Timer
t.timer.cancel()