Tkinter: как использовать потоки, чтобы предотвратить "зависание" цикла главных событий
У меня небольшой графический тест с кнопкой "Пуск" и индикатором выполнения. Желаемое поведение:
- Нажмите Пуск
- Прогрессбар колеблется в течение 5 секунд
- Прогрессбар останавливается
Наблюдаемое поведение заключается в том, что кнопка "Пуск" зависает на 5 секунд, затем отображается индикатор выполнения (без колебаний).
Вот мой код до сих пор:
class GUI:
def __init__(self, master):
self.master = master
self.test_button = Button(self.master, command=self.tb_click)
self.test_button.configure(
text="Start", background="Grey",
padx=50
)
self.test_button.pack(side=TOP)
def progress(self):
self.prog_bar = ttk.Progressbar(
self.master, orient="horizontal",
length=200, mode="indeterminate"
)
self.prog_bar.pack(side=TOP)
def tb_click(self):
self.progress()
self.prog_bar.start()
# Simulate long running process
t = threading.Thread(target=time.sleep, args=(5,))
t.start()
t.join()
self.prog_bar.stop()
root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()
Основываясь на информации Брайана Оукли, я понимаю, что мне нужно использовать темы. Я попытался создать поток, но я предполагаю, что поскольку поток запускается из основного потока, это не помогает.
У меня была идея поместить логическую часть в другой класс и создать экземпляр GUI из этого класса, аналогично примеру кода А. Родаса здесь.
Мой вопрос:
Я не могу понять, как его кодировать, чтобы эта команда:
self.test_button = Button(self.master, command=self.tb_click)
вызывает функцию, которая находится в другом классе. Это плохо или вообще возможно? Как бы я создал 2-й класс, который может обрабатывать self.tb_click? Я попытался следовать примеру кода А. Родаса, который прекрасно работает. Но я не могу понять, как реализовать его решение в случае виджета Button, который запускает действие.
Если бы мне вместо этого пришлось обрабатывать поток из одного класса GUI, как бы создать поток, который не мешает основному потоку?
5 ответов
Когда вы присоединяетесь к новому потоку в основном потоке, он будет ждать окончания потока, поэтому графический интерфейс будет блокироваться, даже если вы используете многопоточность.
Если вы хотите поместить логическую часть в другой класс, вы можете напрямую создать подкласс Thread, а затем запустить новый объект этого класса при нажатии кнопки. Конструктор этого подкласса Thread может получить объект Queue, а затем вы сможете связать его с частью GUI. Итак, мое предложение:
- Создайте объект Queue в главном потоке
- Создать новый поток с доступом к этой очереди
- Периодически проверять очередь в основном потоке
Затем вам нужно решить проблему того, что происходит, если пользователь нажимает два раза одну и ту же кнопку (при каждом щелчке она создает новую ветку), но вы можете это исправить, отключив кнопку запуска и включив ее снова после вызова self.prog_bar.stop()
,
import Queue
class GUI:
# ...
def tb_click(self):
self.progress()
self.prog_bar.start()
self.queue = Queue.Queue()
ThreadedTask(self.queue).start()
self.master.after(100, self.process_queue)
def process_queue(self):
try:
msg = self.queue.get(0)
# Show result of the task if needed
self.prog_bar.stop()
except Queue.Empty:
self.master.after(100, self.process_queue)
class ThreadedTask(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
time.sleep(5) # Simulate long running process
self.queue.put("Task finished")
Я предоставлю основание для альтернативного решения. Он не является специфическим для индикатора выполнения Tk как такового, но он, безусловно, может быть очень легко реализован для этого.
Вот некоторые классы, которые позволяют вам запускать другие задачи в фоновом режиме Tk, обновлять элементы управления Tk при желании и не блокировать графический интерфейс!
Вот класс TkRepeatingTask и BackgroundTask:
import threading
class TkRepeatingTask():
def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
self.__tk_ = tkRoot
self.__func_ = taskFuncPointer
self.__freq_ = freqencyMillis
self.__isRunning_ = False
def isRunning( self ) : return self.__isRunning_
def start( self ) :
self.__isRunning_ = True
self.__onTimer()
def stop( self ) : self.__isRunning_ = False
def __onTimer( self ):
if self.__isRunning_ :
self.__func_()
self.__tk_.after( self.__freq_, self.__onTimer )
class BackgroundTask():
def __init__( self, taskFuncPointer ):
self.__taskFuncPointer_ = taskFuncPointer
self.__workerThread_ = None
self.__isRunning_ = False
def taskFuncPointer( self ) : return self.__taskFuncPointer_
def isRunning( self ) :
return self.__isRunning_ and self.__workerThread_.isAlive()
def start( self ):
if not self.__isRunning_ :
self.__isRunning_ = True
self.__workerThread_ = self.WorkerThread( self )
self.__workerThread_.start()
def stop( self ) : self.__isRunning_ = False
class WorkerThread( threading.Thread ):
def __init__( self, bgTask ):
threading.Thread.__init__( self )
self.__bgTask_ = bgTask
def run( self ):
try :
self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
except Exception as e: print repr(e)
self.__bgTask_.stop()
Вот тест Tk, который демонстрирует их использование. Просто добавьте это в конец модуля с этими классами, если вы хотите увидеть демонстрацию в действии:
def tkThreadingTest():
from tkinter import Tk, Label, Button, StringVar
from time import sleep
class UnitTestGUI:
def __init__( self, master ):
self.master = master
master.title( "Threading Test" )
self.testButton = Button(
self.master, text="Blocking", command=self.myLongProcess )
self.testButton.pack()
self.threadedButton = Button(
self.master, text="Threaded", command=self.onThreadedClicked )
self.threadedButton.pack()
self.cancelButton = Button(
self.master, text="Stop", command=self.onStopClicked )
self.cancelButton.pack()
self.statusLabelVar = StringVar()
self.statusLabel = Label( master, textvariable=self.statusLabelVar )
self.statusLabel.pack()
self.clickMeButton = Button(
self.master, text="Click Me", command=self.onClickMeClicked )
self.clickMeButton.pack()
self.clickCountLabelVar = StringVar()
self.clickCountLabel = Label( master, textvariable=self.clickCountLabelVar )
self.clickCountLabel.pack()
self.threadedButton = Button(
self.master, text="Timer", command=self.onTimerClicked )
self.threadedButton.pack()
self.timerCountLabelVar = StringVar()
self.timerCountLabel = Label( master, textvariable=self.timerCountLabelVar )
self.timerCountLabel.pack()
self.timerCounter_=0
self.clickCounter_=0
self.bgTask = BackgroundTask( self.myLongProcess )
self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )
def close( self ) :
print "close"
try: self.bgTask.stop()
except: pass
try: self.timer.stop()
except: pass
self.master.quit()
def onThreadedClicked( self ):
print "onThreadedClicked"
try: self.bgTask.start()
except: pass
def onTimerClicked( self ) :
print "onTimerClicked"
self.timer.start()
def onStopClicked( self ) :
print "onStopClicked"
try: self.bgTask.stop()
except: pass
try: self.timer.stop()
except: pass
def onClickMeClicked( self ):
print "onClickMeClicked"
self.clickCounter_+=1
self.clickCountLabelVar.set( str(self.clickCounter_) )
def onTimer( self ) :
print "onTimer"
self.timerCounter_+=1
self.timerCountLabelVar.set( str(self.timerCounter_) )
def myLongProcess( self, isRunningFunc=None ) :
print "starting myLongProcess"
for i in range( 1, 10 ):
try:
if not isRunningFunc() :
self.onMyLongProcessUpdate( "Stopped!" )
return
except : pass
self.onMyLongProcessUpdate( i )
sleep( 1.5 ) # simulate doing work
self.onMyLongProcessUpdate( "Done!" )
def onMyLongProcessUpdate( self, status ) :
print "Process Update: %s" % (status,)
self.statusLabelVar.set( str(status) )
root = Tk()
gui = UnitTestGUI( root )
root.protocol( "WM_DELETE_WINDOW", gui.close )
root.mainloop()
if __name__ == "__main__":
tkThreadingTest()
Две точки импорта, о которых я расскажу про BackgroundTask:
1) Функция, которую вы запускаете в фоновой задаче, должна брать указатель на функцию, которую она будет вызывать и уважать, что позволяет отменить задачу на полпути - если это возможно.
2) Вы должны убедиться, что фоновая задача остановлена при выходе из приложения. Этот поток все равно будет работать, даже если ваш графический интерфейс закрыт, если вы не обращаетесь к этому!
Я использовал RxPY, в котором есть несколько хороших потоковых функций, чтобы решить эту проблему довольно чисто. Никаких очередей, и я предоставил функцию, которая запускается в основном потоке после завершения фонового потока. Вот рабочий пример:
import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk
class UI:
def __init__(self):
self.root = tk.Tk()
self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
self.button = tk.Button(text="Do Task", command=self.do_task).pack()
def do_task(self):
rx.empty().subscribe(
on_completed=self.long_running_task,
scheduler=self.pool_scheduler
)
def long_running_task(self):
# your long running task here... eg:
time.sleep(3)
# if you want a callback on the main thread:
self.root.after(5, self.on_task_complete)
def on_task_complete(self):
pass # runs on main thread
if __name__ == "__main__":
ui = UI()
ui.root.mainloop()
Другой способ использовать эту конструкцию, которая может быть чище (в зависимости от предпочтений):
tk.Button(text="Do Task", command=self.button_clicked).pack()
...
def button_clicked(self):
def do_task(_):
time.sleep(3) # runs on background thread
def on_task_done():
pass # runs on main thread
rx.just(1).subscribe(
on_next=do_task,
on_completed=lambda: self.root.after(5, on_task_done),
scheduler=self.pool_scheduler
)
Проблема в том, что t.join() блокирует событие click, основной поток не возвращается к циклу событий для обработки перерисовок. См. Почему ttk Progressbar появляется после того, как процесс в Tkinter или TTK индикатор выполнения заблокирован при отправке электронной почты
Вот список моих ответов на StackOverflow, касающихся tkinter и многопоточности: