EOFError при вызове функции с использованием многопроцессорной обработки для цикла Python
Я работаю над проектом, пишущим функцию сортировки слиянием, которая использует многопроцессорность в Python. Моя функция работает и работает лучше, чем однопотоковая сортировка слиянием, но когда я вызываю функцию через цикл for, она выдает EOFError
, Это моя функция:
def merge(inp):
A = inp[0]
B = inp[1]
out = []
i = 0
j = 0
k = 0
while i < len(A) and j < len(B):
if A[i] <= B[j]:
out.append(A[i])
i+=1
else:
out.append(B[j])
j+=1
while i < len(A):
out.append(A[i])
i += 1
while j < len(B):
out.append(B[j])
j += 1
return(out)
def sort(A, multi = False, conn = None):
B = []
if(not multi):
for i in A: B.append([i])
A = B
else:
pass
while len(A) > 1:
B=[]
i = 0
while i < len(A) - 1:
B.append(merge((A[i], A[i+1])))
i += 2
try:
B.append(A[i])
except:
pass
A = B
if multi:
try:
conn.send(B[0])
conn.close()
except: pass
return(B[0])
def m_sort(A):
B = []
for i in A: B.append([i])
A = B
d = len(A) // cpu_count()
pipes=[]
for i in range(cpu_count()):
parrent, child = Pipe()
pipes.append(parrent)
Process(target = sort, args=(A[i*d:(i+1)*d], True, child,)).start()
B=[]
for i in pipes:
t = i.recv()
B.append(t)
for i in A[d*cpu_count():]:
B.append(i)
B=sort(B,True)
return(B)
m_sort
(мультисортировка) функция разбивает список на n частей, где n - количество ядер ЦП в системе, и запускается merge()
на каждом из них в отдельном процессе. Затем окончательный список, состоящий из n объединенных подсписков, сортируется однопоточным. С этой основной программой все отлично работает:
import random
from multiprocessing import cpu_count, Pipe, Process
if __name__ == '__main__':
A = []
for i in range(0,500000): A.append(random.randint(0,1000))
B = m_sort(A)
print(B == sorted(A))
Но с этим он падает перед первым родом
import random
from multiprocessing import cpu_count, Pipe, Process
if __name__ == '__main__':
for i in range(1, 2500, 10):
A = []
for j in range(i): A.append(random.randint(0,1000))
B = m_sort(A)
print(B==sorted(A))
Трассировка (последний вызов был последним): Файл "C:\Users\user\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\connection.py", строка 312, в _recv_bytes nread, err = ov.GetOverlappedResult(True) BrokenPipeError: [WinError 109] Канал завершен
Во время обработки вышеупомянутого исключения произошло другое исключение:
Трассировка (последний вызов был последним): Файл "C:\Users\user\Desktop\Python Project\merge_v11.py", строка 86, в файле B = m_sort(A) "C: \ Users \ user \ Desktop \ Python Project \ merge_v11.py ", строка 70, в файле m_sort t = i.recv()" C: \ Users \ user \ AppData \ Local \ Programs \ Python \ Python36-32 \ lib \ multiprocessing \ connection.py ", строка 250, в recv buf = self._recv_bytes() Файл "C:\Users\user\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\connection.py", строка 321, в _recv_bytes вызывает EOFError EOFError
Может кто-нибудь объяснить мне, в чем проблема и как ее исправить? Заранее спасибо!
[ ОБНОВЛЕНИЕ ]
Я заставил программу работать (не знаю, почему она работает), поместив m_sort (A) в попытку-исключение. Это работает, но делает это примерно в 1000 раз медленнее