Как запустить вложенные, иерархические пафосные многопроцессорные карты?
Собрав значительную часть своего кода на сериализации / травлении укропов, я также пытаюсь использовать пафосную многопроцессорность для распараллеливания моих вычислений. Пафос это естественное продолжение укропа.
При попытке запустить вложенный
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
внутри другого ProcessingPool().map
тогда я получу:
AssertionError: daemonic processes are not allowed to have children
Например:
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
доходность
AssertionError: daemonic processes are not allowed to have children
Я пытался с помощью amap(...).get()
безуспешно. Это на пафос 0.2.0.
Каков наилучший способ учета вложенного распараллеливания?
Обновить
Я должен быть честным в этом пункте, и признаться, что я удалил утверждение "daemonic processes are not allowed to have children"
от пафоса Я также построил что-то каскадное KeyboardInterrupt
для рабочих и рабочих тех... Части решения ниже:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
Кажется, работает с консоли и записной книжки IPython (с кнопкой остановки), но не уверен, что это на 100% правильно во всех угловых случаях.
1 ответ
Я столкнулся с точно такой же проблемой. В моем случае, внутренняя операция была той, которая нуждалась в параллелизме, поэтому я сделал ThreadingPool
из ProcessingPool
, Вот ваш пример:
from pathos.multiprocessing import ProcessingPool, ThreadingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ThreadingPool().map(refork, xrange(3))
Вы можете даже иметь другой слой с другим внешним пулом потоков. В зависимости от вашего случая вы можете инвертировать порядок этих пулов. Однако вы не можете иметь процессы процессов. Если действительно необходимо, см.: /questions/25429364/python-process-pool-ne-yavlyaetsya-demonom/25429375#25429375. Я еще не пробовал это сам, поэтому я не могу уточнить это.