Многопроцессорная обработка внутри класса Python

Я пытаюсь сделать параллельные вычисления, чтобы ускорить цикл for (я уже использую itertools, мне нужно больше скорости, так как я делаю цикл for несколько раз). Я новичок в многопроцессорности. Я проверил несколько вопросов о переполнении стека и попытался решить свою проблему, однако у меня все еще есть некоторые трудности. Я создаю общие переменные (self.A, self.B, self.C), чтобы многопроцессорная обработка выполнялась эффективно. Тем не менее, я думаю, что я делаю что-то не так, поскольку, когда я проверяю свои переменные после вычисления, я вижу, что они не изменились. Мой код немного сложен, поэтому приведенный ниже код является примером кода, который демонстрирует мою проблему. Спасибо за вашу помощь!

import numpy as np
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools

class F():
     def __init__(self, num_process=4):
        self.num_process = num_process
        self.idx = list(itertools.product(range(5), range(10)))
        self.A = np.zeros((5, 10)) 
        if self.num_process > 1:
            self.A = np.frombuffer(Array(c_double, self.A.flat, lock=False))
            self.A.resize(5,10)

     def solve(self):       
         self.B = np.zeros((10, 5, 10)) 
         self.C = np.zeros((10, 5, 10)) 
         if self.num_process > 1:
            self.B = np.frombuffer(Array(c_double, self.B.flat, lock=False))
            self.B.resize(10,5,10)
            self.C = np.frombuffer(Array(c_double, self.C.flat, lock=False))
            self.C.resize(10,5,10)
         print('Before=',self.A,self.B,self.C)
         for i in range(10):   
             if self.num_process == 1:
                 for (k,l) in self.idx:    
                     self.B[i,k,l]=1
                     self.C[i,k,l]=1

             else:
                 workers = []
                 for worker_num in range(self.num_process):
                     worker = Process(target=F.update, 
                                         args=(i, worker_num, self.num_process,
                                               self.idx, self.A, self.B, self.C))
                     workers.append(worker)
                     worker.start()
                 for worker in workers:
                     worker.join() 
         print('After=',self.A,self.B,self.C)

     @staticmethod 
     def update( i, worker_num, num_process, idx, A, B, C):
        start_num = int(len(idx) * (worker_num/num_process))
        end_num = int(len(idx) * ((worker_num+1)/num_process))
        for j in range(start_num, end_num):
            k,l = idx[j]
            B[i,k,l]=min(2,A[k,l])
            C[i,k,l]=2

if __name__ == '__main__':

    var=F()
    var.solve()

Когда я печатаю свои переменные после вычисления, я вижу, что они не изменились.

ОБНОВИТЬ
Я смог исправить свой код и выполнить многопроцессорную обработку, используя приведенный ниже код. Моя ошибка, как указал Рики Ким, заключалась в том, что я не создавал общие переменные. Приведенный ниже код достигает этого, однако он все еще медленнее, чем при использовании одного процесса (конечно, для выполнения тех же операций). Любые идеи о том, как сделать многопроцессорность быстрее и эффективнее. Спасибо!

import numpy as np
import multiprocessing as mp
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools

class F():
     def __init__(self, num_process=4):
        self.num_process = num_process
        self.idx = list(itertools.product(range(5), range(10)))
        self.A = np.zeros((5, 10)) 

     def solve(self):       
         B_shared = Array(c_double, 10*5*10)
         C_shared = Array(c_double, 10*5*10)
         self.B = np.frombuffer(B_shared.get_obj())
         self.B = self.B.reshape(10,5,10)
         self.C = np.frombuffer(C_shared.get_obj())
         self.C = self.C.reshape(10,5,10)
         print('Before=',self.A,self.B,self.C)
         for i in range(10):   
             if self.num_process == 1:
                 # perform some expensive operation
                 for (k,l) in self.idx:    
                     self.B[i,k,l]=1
                     self.C[i,k,l]=1

             else:
                 workers = []
                 for worker_num in range(self.num_process):
                     worker = Process(target=self.update, 
                                         args=(i, worker_num, self.num_process, B_shared, C_shared))
                     workers.append(worker)
                     worker.start()
                 for worker in workers:
                     worker.join() 
         print('After=',self.A,self.B,self.C)


     def update(self, i, worker_num, num_process, B_shared, C_shared):
        B = np.frombuffer(B_shared.get_obj())
        B = B.reshape((10,5,10)) 
        C = np.frombuffer(B_shared.get_obj())
        C = C.reshape((10,5,10)) 
        start_num = int(len(self.idx) * (worker_num/num_process))
        end_num = int(len(self.idx) * ((worker_num+1)/num_process))
        for j in range(start_num, end_num):
            # perform some expensive operation
            k,l = self.idx[j]
            B[i,k,l]=min(2,self.A[k,l])
            C[i,k,l]=2

if __name__ == '__main__':
    mp.freeze_support()
    var=F()
    var.solve()

0 ответов

Другие вопросы по тегам