Многопроцессорная обработка итерируемого в Python
Я пытаюсь разделить следующий код, чтобы учесть многопроцессорность в python, и это действительно становится для меня досадной задачей - я новичок в многопроцессорности и прочитал документацию и столько примеров, сколько смог найти, но все еще не нашел решения он будет работать на всех ядрах процессора одновременно.
Я хотел бы разделить итерируемые элементы на четверти и сделать так, чтобы они вычисляли тест параллельно.
Мой пример одного потока:
import itertools as it
import numpy as np
wmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
plines1 = it.product(wmod[0],wmod[1],wmod[2])
plines2 = it.product(pmod[0],pmod[1],pmod[2])
check = .915
result = []
for count, (A,B) in enumerate(zip(plines1,plines2)):
pass
test = (sum(B)+10)/(sum(A)+12)
if test > check:
result = np.append(result,[A,B])
print('results: ',result)
Я понимаю, что это очень маленький пример пары матриц 3х3, но я хотел бы применить его к паре матриц большего размера, для вычисления которых требуется около часа. Я ценю любой совет, данный.
1 ответ
Я бы предложил использовать очереди для вывода ваших итераций. Что-то вроде того:
import multiprocessing as mp
import numpy as np
import itertools as it
def worker(in_queue, out_queue):
check = 0.915
for a in iter(in_queue.get, 'STOP'):
A = a[0]
B = a[1]
test = (sum(B)+10)/(sum(A)+12)
if test > check:
out_queue.put([A,B])
else:
out_queue.put('')
if __name__ == "__main__":
wmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
plines1 = it.product(wmod[0],wmod[1],wmod[2])
plines2 = it.product(pmod[0],pmod[1],pmod[2])
# determine length of your iterator
counts = 26
# setup iterator
it = zip(plines1,plines2)
in_queue = mp.Queue()
out_queue = mp.Queue()
# setup workers
numProc = 2
process = [mp.Process(target=worker,
args=(in_queue, out_queue), daemon=True) for x in range(numProc)]
# run processes
for p in process:
p.start()
results = []
control = True
# fill queue and get data
# code fills the queue until a new element is available in the output
# fill blocks if no slot is available in the in_queue
for idx in range(counts):
while out_queue.empty() and control:
# fill the queue
try:
in_queue.put(next(it), block=True)
except StopIteration:
# signals for processes stop
for p in process:
print('stopping')
in_queue.put('STOP')
control = False
break
results.append(out_queue.get(timeout=10))
# wait for processes to finish
for p in process:
p.join()
print(results)
print('finished')
Однако сначала вам нужно будет определить, насколько длинным будет ваш список задач.