EOFError при вызове функции с использованием многопроцессорной обработки для цикла Python

Я работаю над проектом, пишущим функцию сортировки слиянием, которая использует многопроцессорность в Python. Моя функция работает и работает лучше, чем однопотоковая сортировка слиянием, но когда я вызываю функцию через цикл for, она выдает EOFError, Это моя функция:

def merge(inp):
    A = inp[0]
    B = inp[1]
    out = []
    i = 0
    j = 0
    k = 0
    while i < len(A) and j < len(B):
        if A[i] <= B[j]:
            out.append(A[i])
            i+=1
        else:
            out.append(B[j])
            j+=1
    while i < len(A):
        out.append(A[i])
        i += 1
    while j < len(B):
        out.append(B[j])
        j += 1
    return(out)

def sort(A, multi = False, conn = None):
    B = []

    if(not multi):
        for i in A: B.append([i])
        A = B
    else:
        pass

    while len(A) > 1:
        B=[]
        i = 0
        while i < len(A) - 1:
            B.append(merge((A[i], A[i+1])))
            i += 2
        try:
            B.append(A[i])
        except:
            pass
        A = B
    if multi:
        try:
            conn.send(B[0])
            conn.close()
        except: pass
    return(B[0])

def m_sort(A):
            B = []
    for i in A: B.append([i])
    A = B
    d = len(A) // cpu_count()
    pipes=[]
    for i in range(cpu_count()):
        parrent, child = Pipe()
        pipes.append(parrent)
        Process(target = sort, args=(A[i*d:(i+1)*d], True, child,)).start()
    B=[]
    for i in pipes:
        t = i.recv()
        B.append(t)
    for i in A[d*cpu_count():]:
        B.append(i)
    B=sort(B,True)
    return(B)

m_sort (мультисортировка) функция разбивает список на n частей, где n - количество ядер ЦП в системе, и запускается merge() на каждом из них в отдельном процессе. Затем окончательный список, состоящий из n объединенных подсписков, сортируется однопоточным. С этой основной программой все отлично работает:

import random
from multiprocessing import cpu_count, Pipe, Process
if __name__ == '__main__':
    A = []
    for i in range(0,500000): A.append(random.randint(0,1000))
    B = m_sort(A)
    print(B == sorted(A))

Но с этим он падает перед первым родом

import random
from multiprocessing import cpu_count, Pipe, Process
if __name__ == '__main__':
    for i in range(1, 2500, 10):
        A = []
        for j in range(i): A.append(random.randint(0,1000))
        B = m_sort(A)
        print(B==sorted(A))

Трассировка (последний вызов был последним): Файл "C:\Users\user\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\connection.py", строка 312, в _recv_bytes nread, err = ov.GetOverlappedResult(True) BrokenPipeError: [WinError 109] Канал завершен

Во время обработки вышеупомянутого исключения произошло другое исключение:

Трассировка (последний вызов был последним): Файл "C:\Users\user\Desktop\Python Project\merge_v11.py", строка 86, в файле B = m_sort(A) "C: \ Users \ user \ Desktop \ Python Project \ merge_v11.py ", строка 70, в файле m_sort t = i.recv()" C: \ Users \ user \ AppData \ Local \ Programs \ Python \ Python36-32 \ lib \ multiprocessing \ connection.py ", строка 250, в recv buf = self._recv_bytes() Файл "C:\Users\user\AppData\Local\Programs\Python\Python36-32\lib\multiprocessing\connection.py", строка 321, в _recv_bytes вызывает EOFError EOFError

Может кто-нибудь объяснить мне, в чем проблема и как ее исправить? Заранее спасибо!

[ ОБНОВЛЕНИЕ ]
Я заставил программу работать (не знаю, почему она работает), поместив m_sort (A) в попытку-исключение. Это работает, но делает это примерно в 1000 раз медленнее

0 ответов

Другие вопросы по тегам