Joblib Parallel использует только одно ядро, если запущено из QThread
Я разрабатываю графический интерфейс, который выполняет некоторые сложные вычисления. Чтобы ускорить процесс, я использую параллельное выполнение joblib вместе с QThreads pyqt, чтобы графический интерфейс пользователя не отвечал на запросы. Параллельное выполнение пока работает нормально, но если оно встроено в графический интерфейс и запущено в своем собственном потоке, оно использует только одно из 4 моих ядер. Что-нибудь фундаментальное, что я пропустил в мире многопоточности?
Вот примерный набросок моей установки:
class ThreadRunner(QtCore.QObject):
start = QtCore.pyqtSignal()
result_finished = QtCore.pyqtSignal(np.ndarray)
def __init__(self, function, *args, **kwargs):
super(DispMapRunner, self).__init__()
self.function = function
self.args = args
self.kwargs = kwargs
self.start.connect(self.run)
def run(self):
print "New Thread started"
result = self.function(*self.args, **self.kwargs)
self.result_finished.emit(result)
class Gui(QtGui.QMainWindow, form_class):
def __init__(self, cl_args, parent=None):
super(Gui, self).__init__()
#other stuff
def start_thread(self, fun, *args, **kwargs):
self.runner = ThreadRunner(fun, *args, **kwargs)
self.thread = QtCore.QThread()
self.runner.moveToThread(self.thread)
# more stuff for catching results
def slice_producer(data):
n_images, rows, cols = data.shape[:3]
for r in range(rows):
yield np.copy(data[:,r,...])
def run_parallel(data, *args, **kwargs):
results = joblib.Parallel(
n_jobs=4,
verbose=12,
pre_dispatch='1.5*n_jobs'
)
(
delayed(
memory.cache(do_long_computation))
(slice, **kwargs) for slice in slice_producer(data)
)
Я надеюсь, что это не слишком долго и в то же время слишком расплывчато. Я использую pyqt4 4.11.3 и joblib 0.8.4.
Я снова проверил свой код и заметил следующее предупреждение:
UserWarning: Multiprocessing backed parallel loops cannot
be nested below threads, setting n_jobs=1
Что уточняет мой вопрос к следующему: Как запустить многопроцессорный процесс в отдельном потоке?
1 ответ
Хорошо, благодаря ekhumoro, я пришел к тому, что работает, использует только экземпляр mp.pool и работает с обратными вызовами. Единственным недостатком является то, что ошибки в дочернем процессе молча терпят неудачу (например, изменение результатов приводит к f_wrapper). Вот код для дальнейшего использования:
from PyQt4.QtCore import *
from PyQt4.QtGui import *
import multiprocessing
import sys
import numpy as np
import time
def f(data_slice, **kwargs):
'''This is a time-intensive function, which we do not want to alter
'''
data = 0
for row in range(data_slice.shape[0]):
for col in range(data_slice.shape[1]):
data += data_slice[row,col]**2
time.sleep(0.1)
return data, 3, 5, 3 # some dummy calculation results
def f_wrapper(row, data_slice, **kwargs):
results = f(data_slice, **kwargs)
return row, results
class MainWindow(QMainWindow): #You can only add menus to QMainWindows
def __init__(self):
super(MainWindow, self).__init__()
self.pool = multiprocessing.Pool(processes=4)
button1 = QPushButton('Connect', self)
button1.clicked.connect(self.apply_connection)
self.text = QTextEdit()
vbox1 = QVBoxLayout()
vbox1.addWidget(button1)
vbox1.addWidget(self.text)
myframe = QFrame()
myframe.setLayout(vbox1)
self.setCentralWidget(myframe)
self.show() #display and activate focus
self.raise_()
def apply_connection(self):
self.rows_processed = list()
self.max_size = 1000
data = np.random.random(size = (100, self.max_size,self.max_size))
kwargs = {'some_kwarg' : 1000}
for row in range(data.shape[1]):
slice = data[:,row, :]
print "starting f for row ", row
result = self.pool.apply_async(f_wrapper,
args = (row, slice),
kwds = kwargs,
callback=self.update_gui)
#~ result.get() # blocks gui, but raises errors for debugging
def update_gui(self, result):
row, func_result = result
self.rows_processed.append(row)
print len(self.rows_processed)
print func_result# or do something more intelligent
self.text.append('Applied connection. Row = %d\n' % row)
if len(self.rows_processed) == self.max_size:
print "Done!"
if __name__ == '__main__':
app = QApplication(sys.argv)
gui = MainWindow()
app.exec_()
Если есть хороший способ фиксировать ошибки, это будет хорошим бонусом.