Процесс Python не очищен для повторного использования

Процессы, не очищенные для повторного использования

Всем привет,

Я наткнулся на проблему с ProcessPoolExecutorтам, где процессы обращаются к данным, они не должны быть в состоянии. Позволь мне объяснить:

У меня есть ситуация, похожая на приведенный ниже пример: я получил несколько прогонов, чтобы начать с разных аргументов каждый. Они вычисляют свои вещи параллельно и не имеют причин взаимодействовать друг с другом. Теперь, насколько я понимаю, когда процесс разветвляется, он дублирует сам себя. Дочерний процесс имеет те же данные (память), что и его родительский процесс, но если он что-либо меняет, он делает это в своей собственной копии. Если бы я хотел, чтобы изменения пережили время жизни дочернего процесса, я бы вызвал очереди, каналы и другие вещи IPC.

Но я на самом деле нет! Каждый из процессов манипулирует данными для своих собственных, которые не должны переноситься ни на один из других прогонов. Пример ниже показывает иное, хотя. Следующие прогоны (не параллельные прогоны) могут получить доступ к данным своего предыдущего прогона, что означает, что данные не были удалены из процесса.

Код / Пример

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method

class Static:
    integer: int = 0

def inprocess(run: int) -> None:
    cp = current_process()
    # Print current state
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)

    # Check value
    if Static.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")

    # Update value
    Static.integer = run + 1

def pooling():
    cp = current_process()
    # Get master's pid
    print(f"[{cp.pid} {cp.name}] Start")
    with ProcessPoolExecutor(max_workers=2) as executor:
        for i, _ in enumerate(executor.map(inprocess, range(4))):
            print(f"run #{i} finished", flush=True)

if __name__ == '__main__':
    set_start_method("fork")    # enforce fork
    pooling()

Выход

[1998 MainProcess] Start
[ 0 2020 Process-1] int: 0
[ 2 2020 Process-1] int: 1
[ 1 2021 Process-2] int: 0
[ 3 2021 Process-2] int: 2
run #0 finished
run #1 finished
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
    return [fn(*args) for args in chunk]
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
    return [fn(*args) for args in chunk]
  File "<stdin>", line 14, in inprocess
Exception: [ 2 2020 Process-1] Variable already set!
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 29, in <module>
  File "<stdin>", line 24, in pooling
  File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
Exception: [ 2 2020 Process-1] Variable already set!

Это поведение также может быть воспроизведено с max_workers=1, так как процесс используется повторно. Метод start не влияет на ошибку (хотя только "fork"кажется, использовать более одного процесса).


Итак, подведем итог: я хочу, чтобы каждый новый прогон в процессе со всеми предыдущими данными, но без новых данных из других прогонов. Это возможно? Как бы я этого достиг? Почему вышеизложенное не делает именно это?

Я ценю любую помощь.


я нашел multiprocessing.pool.Pool где можно установить maxtasksperchild=1рабочий процесс уничтожается, когда его задача завершена. Но мне не нравитсяmultiprocessing интерфейс; ProcessPoolExecutor удобнее в использовании. Кроме того, вся идея пула заключается в том, чтобы сэкономить время на настройку процесса, которое будет отбрасываться при уничтожении процесса хостинга после каждого запуска.

2 ответа

Совершенно новые процессы в Python не разделяют состояние памяти. тем не мение ProcessPoolExecutor повторно использует экземпляры процесса. В конце концов, это пул активных процессов. Я предполагаю, что это сделано для того, чтобы операционная система не могла постоянно загружать и запускать процессы.

Такое же поведение вы наблюдаете и в других технологиях распространения, таких как сельдерей, где, если вы не будете осторожны, вы можете удалить глобальное состояние между выполнениями.

Я рекомендую вам лучше управлять своим пространством имен, чтобы инкапсулировать ваши данные. Используя ваш пример, вы можете, например, инкапсулировать свой код и данные в родительский класс, который вы создаете в inprocess()вместо того, чтобы хранить его в общем пространстве имен, как статическое поле в классах или непосредственно в модуле. Таким образом, объект будет окончательно очищен сборщиком мусора:

class State:
    def __init__(self):
        self.integer: int = 0

    def do_stuff():
        self.integer += 42

def use_global_function(state):
    state.integer -= 1664
    state.do_stuff()

def inprocess(run: int) -> None:
    cp = current_process()
    state = State()
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {state.integer}", flush=True)
    if state.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
    state.integer = run + 1
    state.do_stuff()
    use_global_function(state)

Я столкнулся с некоторыми потенциально схожими проблемами и увидел несколько интересных сообщений в этом документе High Memory Usage Using Python Multiprocessing, который указывает на использование gc.collector(), однако в вашем случае это не сработало. Итак, я подумал о том, как был инициализирован класс Static, некоторые моменты:

  1. К сожалению, я не могу воспроизвести ваш минимальный пример, когда появляется сообщение об ошибке значения: ValueError: не удается найти контекст для 'fork'
  2. Учитывая 1, я использую set_start_method("spawn"). Быстрое исправление может заключаться в инициализации каждый раз статического класса, как показано ниже:
      {
    class Static:
        integer: int = 0
        def __init__(self):
            pass
    
    def inprocess(run: int) -> None:
        cp = current_process()
        # Print current state
        print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static().integer}", flush=True)
    
        # Check value
        if Static().integer != 0:
            raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")
    
        # Update value
        Static().integer = run + 1
    
    
    def pooling():
        cp = current_process()
        # Get master's pid
        print(f"[{cp.pid} {cp.name}] Start")
        with ProcessPoolExecutor(max_workers=2) as executor:
            for i, _ in enumerate(executor.map(inprocess, range(4))):
                print(f"run #{i} finished", flush=True)
    
    
    if __name__ == "__main__":
        print("start")
        # set_start_method("fork")  # enforce fork , ValueError: cannot find context for 'fork'
        set_start_method("spawn")    # Alternative
        pooling()
}

Это возвращает:

      [ 0 1424 SpawnProcess-2] int: 0
[ 1 1424 SpawnProcess-2] int: 0
run #0 finished
[ 2 17956 SpawnProcess-1] int: 0
[ 3 1424 SpawnProcess-2] int: 0
run #1 finished
run #2 finished
run #3 finished
Другие вопросы по тегам