Python Multiprocessing - завершить / перезапустить рабочий процесс

У меня есть несколько длительных процессов, которые я хотел бы разделить на несколько процессов. С этой частью я могу справиться без проблем. Проблема, с которой я сталкиваюсь, заключается в том, что иногда эти процессы переходят в зависшее состояние. Чтобы решить эту проблему, я хотел бы иметь возможность установить временной порог для каждой задачи, над которой работает процесс. Когда этот порог времени превышен, я хотел бы перезапустить или завершить задачу.

Первоначально мой код был очень простым с использованием пула процессов, однако с помощью пула я не мог понять, как извлечь процессы из пула, не говоря уже о том, как перезапустить / завершить процесс в пуле.

Я прибег к использованию очереди и обрабатываю объекты, как показано в этом примере ( https://pymotw.com/2/multiprocessing/communication.html с некоторыми изменениями.

Мои попытки понять это в коде ниже. В своем текущем состоянии процесс фактически не завершается. Кроме того, я не могу понять, как заставить процесс перейти к следующей задаче после завершения текущей задачи. Любые предложения / помощь приветствуются, возможно, я иду по этому поводу неправильно.

Спасибо

    import multiprocess
    import time

    class Consumer(multiprocess.Process):
        def __init__(self, task_queue, result_queue, startTimes, name=None):
            multiprocess.Process.__init__(self)
            if name:
                self.name = name
            print 'created process: {0}'.format(self.name)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.startTimes = startTimes

        def stopProcess(self):
            elapseTime = time.time() - self.startTimes[self.name]
            print 'killing process {0} {1}'.format(self.name, elapseTime)
            self.task_queue.cancel_join_thread()
            self.terminate()
            # now want to get the process to start procesing another job

        def run(self):
            '''
            The process subclass calls this on a separate process.
            '''    
            proc_name = self.name
            print proc_name
            while True:
                # pulling the next task off the queue and starting it
                # on the current process.
                task = self.task_queue.get()
                self.task_queue.cancel_join_thread()

                if task is None:
                    # Poison pill means shutdown
                    #print '%s: Exiting' % proc_name
                    self.task_queue.task_done()
                    break
                self.startTimes[proc_name] = time.time()
                answer = task()
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

    class Task(object):
        def __init__(self, a, b, startTimes):
            self.a = a
            self.b = b
            self.startTimes = startTimes
            self.taskName = 'taskName_{0}_{1}'.format(self.a, self.b)

        def __call__(self):
            import time
            import os

            print 'new job in process pid:', os.getpid(), self.taskName

            if self.a == 2:
                time.sleep(20000) # simulate a hung process
            else:
                time.sleep(3) # pretend to take some time to do the work
            return '%s * %s = %s' % (self.a, self.b, self.a * self.b)

        def __str__(self):
            return '%s * %s' % (self.a, self.b)

    if __name__ == '__main__':
        # Establish communication queues
        # tasks = this is the work queue and results is for results or completed work
        tasks = multiprocess.JoinableQueue()
        results = multiprocess.Queue()

        #parentPipe, childPipe = multiprocess.Pipe(duplex=True)
        mgr = multiprocess.Manager()
        startTimes = mgr.dict()

        # Start consumers
        numberOfProcesses = 4
        processObjs = []
        for processNumber in range(numberOfProcesses):
            processObj = Consumer(tasks, results, startTimes)
            processObjs.append(processObj)

        for process in processObjs:
            process.start()

        # Enqueue jobs
        num_jobs = 30
        for i in range(num_jobs):
            tasks.put(Task(i, i + 1, startTimes))

        # Add a poison pill for each process object
        for i in range(numberOfProcesses):
            tasks.put(None)

        # process monitor loop, 
        killProcesses = {}
        executing = True
        while executing:
            allDead = True
            for process in processObjs:
                name = process.name
                #status = consumer.status.getStatusString()
                status = process.is_alive()
                pid = process.ident
                elapsedTime = 0
                if name in startTimes:
                    elapsedTime = time.time() - startTimes[name]
                if elapsedTime > 10:
                    process.stopProcess()

                print "{0} - {1} - {2} - {3}".format(name, status, pid, elapsedTime)
                if  allDead and status:
                    allDead = False
            if allDead:
                executing = False
            time.sleep(3)

        # Wait for all of the tasks to finish
        #tasks.join()

        # Start printing results
        while num_jobs:
            result = results.get()
            print 'Result:', result
            num_jobs -= 1

3 ответа

Я вообще рекомендую против подклассов multiprocessing.Process как это приводит к коду, трудно читаемому.

Я бы предпочел инкапсулировать вашу логику в функцию и запустить ее в отдельном процессе. Это делает код намного чище и интуитивно понятнее.

Тем не менее, вместо того, чтобы изобретать велосипед, я бы порекомендовал вам использовать библиотеку, которая уже решает эту проблему для вас, например, Pebble или бильярд.

Например, библиотека Pebble позволяет легко устанавливать тайм-ауты для процессов, запущенных независимо или внутри Pool,

Запуск вашей функции в отдельном процессе с таймаутом:

from pebble import concurrent
from concurrent.futures import TimeoutError

@concurrent.process(timeout=10)
def function(foo, bar=0):
    return foo + bar

future = function(1, bar=2)

try:
    result = future.result()  # blocks until results are ready
except TimeoutError as error:
    print("Function took longer than %d seconds" % error.args[1])

Тот же пример, но с пулом процесса.

with ProcessPool(max_workers=5, max_tasks=10) as pool:
   future = pool.schedule(function, args=[1], timeout=10)

   try:
       result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])

В обоих случаях процесс тайм-аута будет автоматически прекращен для вас.

Более простым решением было бы продолжить использование, чем переопределение Pool заключается в том, чтобы разработать механизм тайм-аута выполняемой вами функции. Например:

from time import sleep
import signal

class TimeoutError(Exception):
    pass    

def handler(signum, frame):
    raise TimeoutError()

def run_with_timeout(func, *args, timeout=10, **kwargs):
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout)
    try:
        res = func(*args, **kwargs)
    except TimeoutError as exc:
        print("Timeout")
        res = exc
    finally:
        signal.alarm(0)
    return res


def test():
    sleep(4)
    print("ok")

if __name__ == "__main__":
    import multiprocessing as mp

    p = mp.Pool()
    print(p.apply_async(run_with_timeout, args=(test,),
                        kwds={"timeout":1}).get())

signal.alarm установить тайм-аут, и когда этот тайм-аут, он запускает обработчик, который останавливает выполнение вашей функции.

РЕДАКТИРОВАТЬ: Если вы используете систему Windows, это кажется немного сложнее, как signal не реализует SIGALRM, Другое решение - использовать Python API уровня C. Этот код был адаптирован из этого SO ответа с небольшой адаптацией для работы в 64-битной системе. Я только что протестировал его на Linux, но он должен работать так же на Windows.

import threading
import ctypes
from time import sleep


class TimeoutError(Exception):
    pass


def run_with_timeout(func, *args, timeout=10, **kwargs):
    interupt_tid = int(threading.get_ident())

    def interupt_thread():
        # Call the low level C python api using ctypes. tid must be converted 
        # to c_long to be valid.
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
            ctypes.c_long(interupt_tid), ctypes.py_object(TimeoutError))
        if res == 0:
            print(threading.enumerate())
            print(interupt_tid)
            raise ValueError("invalid thread id")
        elif res != 1:
            # "if it returns a number greater than one, you're in trouble,
            # and you should call it again with exc=NULL to revert the effect"
            ctypes.pythonapi.PyThreadState_SetAsyncExc(
                ctypes.c_long(interupt_tid), 0)
            raise SystemError("PyThreadState_SetAsyncExc failed")

    timer = threading.Timer(timeout, interupt_thread)
    try:
        timer.start()
        res = func(*args, **kwargs)
    except TimeoutError as exc:
        print("Timeout")
        res = exc
    else:
        timer.cancel()
    return res


def test():
    sleep(4)
    print("ok")


if __name__ == "__main__":
    import multiprocessing as mp

    p = mp.Pool()
    print(p.apply_async(run_with_timeout, args=(test,),
                        kwds={"timeout": 1}).get())
    print(p.apply_async(run_with_timeout, args=(test,),
                        kwds={"timeout": 5}).get())

Для долго выполняющихся процессов и/или длинных итераторов порожденные рабочие процессы могут зависнуть через некоторое время. Для предотвращения этого есть две встроенные техники:

  • Перезапустите рабочих после того, как они доставилиmaxtasksperchildзадачи из очереди.
  • Проходитьtimeoutкpool.imap.next(), поймать TimeoutError и закончить остальную работу в другом пуле.

Следующая оболочка реализует и то, и другое в качестве генератора. Это также работает при замене stdlibmultiprocessingсmultiprocess.

      import multiprocessing as mp


def imap(
    func,
    iterable,
    *,
    processes=None,
    maxtasksperchild=42,
    timeout=42,
    initializer=None,
    initargs=(),
    context=mp.get_context("spawn")
):
    """Multiprocessing imap, restarting workers after maxtasksperchild tasks to avoid zombies.

    Example:
        >>> list(imap(str, range(5)))
        ['0', '1', '2', '3', '4']

    Raises:
        mp.TimeoutError: if the next result cannot be returned within timeout seconds.

    Yields:
        Ordered results as they come in.
    """
    with context.Pool(
        processes=processes,
        maxtasksperchild=maxtasksperchild,
        initializer=initializer,
        initargs=initargs,
    ) as pool:
        it = pool.imap(func, iterable)
        while True:
            try:
                yield it.next(timeout)
            except StopIteration:
                return

Чтобы поймать TimeoutError:

      >>> import time
>>> iterable = list(range(10))
>>> results = []
>>> try:
...     for i, result in enumerate(imap(time.sleep, iterable, processes=2, timeout=2)):
...         results.append(result)
... except mp.TimeoutError:
...     print("Failed to process the following subset of iterable:", iterable[i:])
Failed to process the following subset of iterable: [2, 3, 4, 5, 6, 7, 8, 9]
Другие вопросы по тегам