Можно ли использовать concurrent.futures для выполнения функции / метода внутри класса tkinter после события? Если да, то как?

Я пытаюсь использовать пул работников, предоставленных concurrent.futures.ProcessPoolExecutor ускорить выполнение метода внутри класса tkinter. Это связано с тем, что выполнение метода требует значительных ресурсов процессора и "распараллеливание" должно сократить время его завершения. Я надеюсь сравнить его производительность с контролем - последовательное выполнение того же метода. Я написал тестовый код tkinter GUI для выполнения этого теста. Последовательное выполнение метода работает, но параллельная часть не работает. Цени любую помощь, чтобы заставить работать параллельную часть моего кода.

Обновление: я гарантировал, что я правильно реализовал concurrent.futures.ProcessPoolExecutor решить мою проблему вне Tk(), т.е. из стандартного скрипта python3. Это объясняется в этом ответе. Теперь я хочу реализовать параллельный метод, описанный в этом ответе, для работы с кнопкой в ​​моем графическом интерфейсе tkinter.Tk().

Мой тестовый код приведен ниже. Когда вы запустите его, появится графический интерфейс. Когда вы нажимаете кнопку "НАЙТИ", функция _findmatch будет выполняться последовательно и одновременно, чтобы определить, сколько раз число 5 встречается в диапазоне от 0 до 1E8. Серийная часть работает, но параллельная часть жалуется (см. Ниже). Кто-нибудь знает, как исправить эту ошибку травления?

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_tkinter.tkapp'>: attribute lookup tkapp on _tkinter failed

Тестовый код:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
from time import time, sleep
from itertools import repeat, chain 

class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent, style='App.TFrame')
        self.parent=parent

        self.button = ttk.Button(self, style='start.TButton', text = 'FIND',
                                 command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E7)
        smatch=[]
        cmatch=[]
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()

        # Run serial code
        start = time()
        smatch = self._findmatch(0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(smatch), end))

        # Run serial code concurrently with concurrent.futures
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        start = time()
        cmatch = self._concurrent_map(nmax, number, workers, num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(cmatch), end))

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_map(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(self._findmatch, cstart, cstop, repeat(number))
        end = time() - start
        print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(futures))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()

1 ответ

Решение

Я наконец нашел способ ответить на мой вопрос.

В книге Марка Саммерфилда "Питон на практике" (2014) упоминается, что multiprocessing модуль, вызываемый concurrent.futures.ProcessPoolExecutor, может вызывать только функции, которые можно импортировать, и использовать данные модулей (вызываемые функциями), которые можно выбирать. Как таковой, он необходим для concurrent.futures.ProcessPoolExecutor и функции (с его аргументом), которые он вызывает, находятся в отдельном модуле, чем модуль GUI tkinter, иначе он не будет работать.

Таким образом, я создал отдельный класс для размещения всех кодов, связанных с concurrent.futures.ProcessPoolExecutor и функции и данные, которые он вызывал, вместо того, чтобы помещать их в приложение класса, мой класс графического интерфейса tkinter.Tk(), как я делал ранее. Это сработало!

Мне также удалось использовать threading.Threads выполнять параллельное выполнение моих последовательных и параллельных задач.

Я делюсь своим пересмотренным тестовым кодом ниже, чтобы продемонстрировать, как я это сделал, и надеюсь, что это поможет любому, кто пытается использовать concurrent.futures с ткинтером.

Очень приятно видеть, как все процессоры набирают обороты с Tk GUI.:)

Пересмотренный тестовый код:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
''' Code to demonstrate how to use concurrent.futures.Executor object with tkinter.'''

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
import threading
from time import time, sleep
from itertools import chain 


class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent)
        self.parent=parent

        self.button = ttk.Button(self, text = 'FIND', command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E8)
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()
        # Concurrent management of serial and concurrent tasks using threading
        self.serworker = threading.Thread(target=self._serial,
                                          args=(0, nmax, number))
        self.subworker  = threading.Thread(target=self._concurrent,
                                           args=(nmax, number, workers,
                                                 num_of_chunks))
        self.serworker.start()         
        self.subworker.start()         

    def _serial(self, nmin, nmax, number):
        fm = Findmatch
        # Run serial code
        start = time()
        smatch = fm._findmatch(fm, 0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(smatch), end))
        self.parent.update_idletasks()
        #print('smatch = ', smatch) 

    def _concurrent(self, nmax, number, workers, num_of_chunks): 
        fm = Findmatch
        # Run serial code concurrently with concurrent.futures .submit()
        start = time()
        cmatch = fm._concurrent_submit(fm, nmax, number, workers,
                                        num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(cmatch), end))
        self.parent.update_idletasks()
        #print('cmatch = ', cmatch) 


class Findmatch:
    ''' A class specially created to host concurrent.futures.ProcessPoolExecutor
        so that the function(s) it calls can be accessible by multiprocessing
        module. Multiprocessing requirements: codes must be importable and code
        data must be pickerable. ref. Python in Practice, by Mark Summerfields,
        section 4.3.2, pg 173, 2014'''
    def __init__(self):
        self.__init__(self)

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_submit(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        self.futures = []
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(num_of_chunks):
                cstart = chunksize * i
                cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
                self.futures.append(executor.submit(
                    self._findmatch, self, cstart, cstop, number))
        end = time() - start
        print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(f.result() for f in cf.as_completed(
            self.futures)))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()
Другие вопросы по тегам