Python Process Pool не является демоном?

Можно ли создать пул Python, который не является демоническим? Я хочу, чтобы пул мог вызывать функцию, в которой есть другой пул.

Я хочу этого, потому что демоны не могут создать процесс. В частности, это приведет к ошибке:

AssertionError: daemonic processes are not allowed to have children

Например, рассмотрим сценарий, в котором function_a есть бассейн, который работает function_b который имеет бассейн, который работает function_c, Эта цепочка функций потерпит неудачу, потому что function_b выполняется в процессе демона, и процессы демона не могут создавать процессы.

10 ответов

Решение

multiprocessing.pool.Pool класс создает рабочие процессы в его __init__ метод, делает их демоническими и запускает их, и невозможно восстановить их daemon приписывать False до того, как они будут запущены (и после этого больше не разрешены). Но вы можете создать свой собственный подкласс multiprocesing.pool.Pool (multiprocessing.Pool это просто функция-обертка) и подставить свой собственный multiprocessing.Process подкласс, который всегда не является демоном, для использования в рабочих процессах.

Вот полный пример того, как это сделать. Важными частями являются два класса NoDaemonProcess а также MyPool наверху и позвонить pool.close() а также pool.join() на ваше MyPool экземпляр в конце.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

У меня была необходимость использовать недемонический пул в Python 3.7, и в итоге я адаптировал код, опубликованный в принятом ответе. Ниже приведен фрагмент, который создает недемонический пул:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Как текущая реализация multiprocessing была подвергнута значительному рефакторингу с учетом контекста, мы должны предоставить NoDaemonContext класс, который имеет наш NoDaemonProcess как атрибут MyPool затем будет использовать этот контекст вместо контекста по умолчанию.

Тем не менее, я должен предупредить, что у этого подхода есть как минимум 2 предостережения:

  1. Это все еще зависит от деталей реализации multiprocessing пакет, и поэтому может сломаться в любое время.
  2. Есть веские причины, почему multiprocessing усложнил использование недемонических процессов, многие из которых описаны здесь. Наиболее убедительным, на мой взгляд, является:

    Что касается разрешения дочерним потокам порождать своих собственных дочерних элементов, использование подпроцесса сопряжено с риском создания небольшой армии "внуков-зомби", если родительский или дочерний потоки завершаются до завершения и возврата подпроцесса.

concurrent.futures.ProcessPoolExecutor не имеет этого ограничения. Он может иметь пул вложенных процессов без каких-либо проблем:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

Приведенный выше демонстрационный код был протестирован с Python 3.8.

Кредит: ответ jfs

Многопроцессорный модуль имеет приятный интерфейс для использования пулов с процессами или потоками. В зависимости от вашего текущего варианта использования, вы можете рассмотреть возможность использования multiprocessing.pool.ThreadPool для вашего внешнего пула, что приведет к созданию потоков (которые позволяют создавать процессы изнутри), а не к процессам.

Это может быть ограничено GIL, но в моем конкретном случае (я тестировал оба) время запуска процессов из внешнего Pool как создано здесь, значительно перевешивает решение с ThreadPool,


Это действительно легко поменять местами Processes за Threads, Узнайте больше о том, как использовать ThreadPool Решение здесь или здесь.

В некоторых версиях Python замена стандартного пула на пользовательский может вызвать ошибку: AssertionError: group argument must be None for now,

Здесь я нашел решение, которое может помочь:

class NonDaemonPool(multiprocessing.pool.Pool):
    def Process(self, *args, **kwds):
        proc = super(NonDaemonPool, self).Process(*args, **kwds)

        class NonDaemonProcess(proc.__class__):
            """Monkey-patch process to ensure it is never daemonized"""

            @property
            def daemon(self):
                return False

            @daemon.setter
            def daemon(self, val):
                pass

        proc.__class__ = NonDaemonProcess

        return proc

Проблема, с которой я столкнулся, заключалась в попытке импортировать глобальные переменные между модулями, в результате чего строка ProcessPool() оценивалась несколько раз.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Затем безопасно импортируйте из другого места в вашем коде

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

Я видел, как люди решали эту проблему, используя celeryвилка multiprocessingназывается billiard (расширения многопроцессорного пула), который позволяет демоническим процессам порождать потомков. Решение - просто заменитьmultiprocessing модуль:

import billiard as multiprocessing

Вот как вы можете запустить пул, даже если вы уже находитесь в демоническом процессе. Это было протестировано в python 3.8.5.

Во-первых, определите диспетчер контекста, который временно удаляет состояние демона текущего процесса.

      class Undaemonize(object):
    '''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
    
    Tested in python 3.8.5'''
    def __init__(self):
        self.p = multiprocessing.process.current_process()
        if 'daemon' in self.p._config:
            self.daemon_status_set = True
        else:
            self.daemon_status_set = False
        self.daemon_status_value = self.p._config.get('daemon')
    def __enter__(self):
        if self.daemon_status_set:
            del self.p._config['daemon']
    def __exit__(self, type, value, traceback):
        if self.daemon_status_set:
            self.p._config['daemon'] = self.daemon_status_value

Теперь вы можете запустить пул следующим образом, даже из процесса демона:

      with Undaemonize():
    pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager 

В то время как другие подходы здесь направлены на создание пула, который в первую очередь не является демоническим, этот подход позволяет вам запустить пул, даже если вы уже находитесь в демоническом процессе. Это особенно полезно, если состояние родительского процесса нельзя легко контролировать, например, потому что он был создан какой-либо распределенной вычислительной средой, такой как dask.

Начиная с Python версии 3.7 мы можем создавать недемонический ProcessPoolExecutor.

С использованиемif __name__ == "__main__":необходимо при использовании многопроцессорной обработки.

      from concurrent.futures import ProcessPoolExecutor as Pool

num_pool = 10
    
def main_pool(num):
    print(num)
    strings_write = (f'{num}-{i}' for i in range(num))
    with Pool(num) as subp:
        subp.map(sub_pool,strings_write)
    return None


def sub_pool(x):
    print(f'{x}')
    return None


if __name__ == "__main__":
    with Pool(num_pool) as p:
        p.map(main_pool,list(range(1,num_pool+1)))

Это обходной путь, когда ошибка кажется ложноположительной. Как также отметил Джеймс, это может произойти при непреднамеренном импорте из демонического процесса.

Например, если у вас есть следующий простой код, WORKER_POOL могут быть непреднамеренно импортированы из рабочего, что приведет к ошибке.

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

Простой, но надежный способ обходного пути:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackru.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

В указанном выше обходном пути MyClass.worker_poolможно использовать без ошибок. Если вы считаете, что этот подход можно улучшить, дайте мне знать.

Другие вопросы по тегам