Почему моя функция __exit__ диспетчера контекста запускается до того, как вычисления не будут завершены?

Функция выхода моего пользовательского диспетчера контекста, по-видимому, запускается до завершения вычислений. Мой менеджер контекста предназначен для упрощения написания параллельного / параллельного кода. Вот мой код менеджера контекста:

import time
from multiprocessing.dummy import Pool, cpu_count

class managed_pool:
    '''Simple context manager for multiprocessing.dummy.Pool'''
    def __init__(self, msg):
        self.msg = msg
    def __enter__(self):
        cores = cpu_count()
        print 'start concurrent ({0} cores): {1}'.format(cores, self.msg)
        self.start = time.time()
        self.pool = Pool(cores)
        return self.pool
    def __exit__(self, type_, value, traceback):
        print 'end concurrent:', self.msg
        print 'time:', time.time() - self.start
        self.pool.close()
        self.pool.join()

Я уже пробовал этот скрипт с multiprocessing.Pool вместо multiprocessing.dummy.Pool и, кажется, все время терпит неудачу.

Вот пример использования менеджера контекста:

def read_engine_files(f):
    engine_input = engineInput()
    with open(f, 'rb') as f:
        engine_input.parse_from_string(f.read())
    return engine_input

with managed_pool('load input files') as pool:
    data = pool.map(read_engine_files, files)

Итак, внутри read_engine_files Я печатаю название файла. Вы заметите в __exit__ функция, которую я также распечатываю, когда вычисление сделано и сколько времени это заняло. Но при просмотре стандартного __exit__ сообщение появляется задолго до завершения вычисления. Мол, за минуты до вычисления. Но htop говорит, что все мои ядра все еще используются. Вот пример вывода

start concurrent (4 cores): load engine input files
file1.pbin
file2.pbin
...
file16.pbin
end concurrent: load engine input files
time: 246.43829298
file17.pbin
...
file45.pbin

Почему __exit__ звонить так рано?

1 ответ

Вы уверены, что просто звоните pool.map()? Это должно блокировать, пока все элементы не были сопоставлены.

Если вы вызываете один из асинхронных методов Poolтогда вы сможете решить проблему, изменив порядок вещей в __exit__(), Просто присоединитесь к бассейну, прежде чем делать резюме.

def __exit__(self, type_, value, traceback):
    self.pool.close()
    self.pool.join()
    print 'end concurrent:', self.msg
    print 'time:', time.time() - self.start

Наиболее вероятное объяснение состоит в том, что произошло исключение. Приведенный выше пример кода не анализирует type, value или же traceback аргументы __exit__ заявление. Таким образом, возникает исключение (и оно не перехватывается ранее), передается оператору выхода, который в свою очередь не реагирует на него. Процессы (или некоторые из них) продолжают работать.

Другие вопросы по тегам