Python Multiprocessing - завершить / перезапустить рабочий процесс
У меня есть несколько длительных процессов, которые я хотел бы разделить на несколько процессов. С этой частью я могу справиться без проблем. Проблема, с которой я сталкиваюсь, заключается в том, что иногда эти процессы переходят в зависшее состояние. Чтобы решить эту проблему, я хотел бы иметь возможность установить временной порог для каждой задачи, над которой работает процесс. Когда этот порог времени превышен, я хотел бы перезапустить или завершить задачу.
Первоначально мой код был очень простым с использованием пула процессов, однако с помощью пула я не мог понять, как извлечь процессы из пула, не говоря уже о том, как перезапустить / завершить процесс в пуле.
Я прибег к использованию очереди и обрабатываю объекты, как показано в этом примере ( https://pymotw.com/2/multiprocessing/communication.html с некоторыми изменениями.
Мои попытки понять это в коде ниже. В своем текущем состоянии процесс фактически не завершается. Кроме того, я не могу понять, как заставить процесс перейти к следующей задаче после завершения текущей задачи. Любые предложения / помощь приветствуются, возможно, я иду по этому поводу неправильно.
Спасибо
import multiprocess
import time
class Consumer(multiprocess.Process):
def __init__(self, task_queue, result_queue, startTimes, name=None):
multiprocess.Process.__init__(self)
if name:
self.name = name
print 'created process: {0}'.format(self.name)
self.task_queue = task_queue
self.result_queue = result_queue
self.startTimes = startTimes
def stopProcess(self):
elapseTime = time.time() - self.startTimes[self.name]
print 'killing process {0} {1}'.format(self.name, elapseTime)
self.task_queue.cancel_join_thread()
self.terminate()
# now want to get the process to start procesing another job
def run(self):
'''
The process subclass calls this on a separate process.
'''
proc_name = self.name
print proc_name
while True:
# pulling the next task off the queue and starting it
# on the current process.
task = self.task_queue.get()
self.task_queue.cancel_join_thread()
if task is None:
# Poison pill means shutdown
#print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
self.startTimes[proc_name] = time.time()
answer = task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b, startTimes):
self.a = a
self.b = b
self.startTimes = startTimes
self.taskName = 'taskName_{0}_{1}'.format(self.a, self.b)
def __call__(self):
import time
import os
print 'new job in process pid:', os.getpid(), self.taskName
if self.a == 2:
time.sleep(20000) # simulate a hung process
else:
time.sleep(3) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
# tasks = this is the work queue and results is for results or completed work
tasks = multiprocess.JoinableQueue()
results = multiprocess.Queue()
#parentPipe, childPipe = multiprocess.Pipe(duplex=True)
mgr = multiprocess.Manager()
startTimes = mgr.dict()
# Start consumers
numberOfProcesses = 4
processObjs = []
for processNumber in range(numberOfProcesses):
processObj = Consumer(tasks, results, startTimes)
processObjs.append(processObj)
for process in processObjs:
process.start()
# Enqueue jobs
num_jobs = 30
for i in range(num_jobs):
tasks.put(Task(i, i + 1, startTimes))
# Add a poison pill for each process object
for i in range(numberOfProcesses):
tasks.put(None)
# process monitor loop,
killProcesses = {}
executing = True
while executing:
allDead = True
for process in processObjs:
name = process.name
#status = consumer.status.getStatusString()
status = process.is_alive()
pid = process.ident
elapsedTime = 0
if name in startTimes:
elapsedTime = time.time() - startTimes[name]
if elapsedTime > 10:
process.stopProcess()
print "{0} - {1} - {2} - {3}".format(name, status, pid, elapsedTime)
if allDead and status:
allDead = False
if allDead:
executing = False
time.sleep(3)
# Wait for all of the tasks to finish
#tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
3 ответа
Я вообще рекомендую против подклассов multiprocessing.Process
как это приводит к коду, трудно читаемому.
Я бы предпочел инкапсулировать вашу логику в функцию и запустить ее в отдельном процессе. Это делает код намного чище и интуитивно понятнее.
Тем не менее, вместо того, чтобы изобретать велосипед, я бы порекомендовал вам использовать библиотеку, которая уже решает эту проблему для вас, например, Pebble или бильярд.
Например, библиотека Pebble позволяет легко устанавливать тайм-ауты для процессов, запущенных независимо или внутри Pool
,
Запуск вашей функции в отдельном процессе с таймаутом:
from pebble import concurrent
from concurrent.futures import TimeoutError
@concurrent.process(timeout=10)
def function(foo, bar=0):
return foo + bar
future = function(1, bar=2)
try:
result = future.result() # blocks until results are ready
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
Тот же пример, но с пулом процесса.
with ProcessPool(max_workers=5, max_tasks=10) as pool:
future = pool.schedule(function, args=[1], timeout=10)
try:
result = future.result() # blocks until results are ready
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
В обоих случаях процесс тайм-аута будет автоматически прекращен для вас.
Более простым решением было бы продолжить использование, чем переопределение Pool
заключается в том, чтобы разработать механизм тайм-аута выполняемой вами функции. Например:
from time import sleep
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
def run_with_timeout(func, *args, timeout=10, **kwargs):
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
try:
res = func(*args, **kwargs)
except TimeoutError as exc:
print("Timeout")
res = exc
finally:
signal.alarm(0)
return res
def test():
sleep(4)
print("ok")
if __name__ == "__main__":
import multiprocessing as mp
p = mp.Pool()
print(p.apply_async(run_with_timeout, args=(test,),
kwds={"timeout":1}).get())
signal.alarm
установить тайм-аут, и когда этот тайм-аут, он запускает обработчик, который останавливает выполнение вашей функции.
РЕДАКТИРОВАТЬ: Если вы используете систему Windows, это кажется немного сложнее, как signal
не реализует SIGALRM
, Другое решение - использовать Python API уровня C. Этот код был адаптирован из этого SO ответа с небольшой адаптацией для работы в 64-битной системе. Я только что протестировал его на Linux, но он должен работать так же на Windows.
import threading
import ctypes
from time import sleep
class TimeoutError(Exception):
pass
def run_with_timeout(func, *args, timeout=10, **kwargs):
interupt_tid = int(threading.get_ident())
def interupt_thread():
# Call the low level C python api using ctypes. tid must be converted
# to c_long to be valid.
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(interupt_tid), ctypes.py_object(TimeoutError))
if res == 0:
print(threading.enumerate())
print(interupt_tid)
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(interupt_tid), 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
timer = threading.Timer(timeout, interupt_thread)
try:
timer.start()
res = func(*args, **kwargs)
except TimeoutError as exc:
print("Timeout")
res = exc
else:
timer.cancel()
return res
def test():
sleep(4)
print("ok")
if __name__ == "__main__":
import multiprocessing as mp
p = mp.Pool()
print(p.apply_async(run_with_timeout, args=(test,),
kwds={"timeout": 1}).get())
print(p.apply_async(run_with_timeout, args=(test,),
kwds={"timeout": 5}).get())
Для долго выполняющихся процессов и/или длинных итераторов порожденные рабочие процессы могут зависнуть через некоторое время. Для предотвращения этого есть две встроенные техники:
- Перезапустите рабочих после того, как они доставили
maxtasksperchild
задачи из очереди. - Проходить
timeout
кpool.imap.next()
, поймать TimeoutError и закончить остальную работу в другом пуле.
Следующая оболочка реализует и то, и другое в качестве генератора. Это также работает при замене stdlibmultiprocessing
сmultiprocess
.
import multiprocessing as mp
def imap(
func,
iterable,
*,
processes=None,
maxtasksperchild=42,
timeout=42,
initializer=None,
initargs=(),
context=mp.get_context("spawn")
):
"""Multiprocessing imap, restarting workers after maxtasksperchild tasks to avoid zombies.
Example:
>>> list(imap(str, range(5)))
['0', '1', '2', '3', '4']
Raises:
mp.TimeoutError: if the next result cannot be returned within timeout seconds.
Yields:
Ordered results as they come in.
"""
with context.Pool(
processes=processes,
maxtasksperchild=maxtasksperchild,
initializer=initializer,
initargs=initargs,
) as pool:
it = pool.imap(func, iterable)
while True:
try:
yield it.next(timeout)
except StopIteration:
return
Чтобы поймать TimeoutError:
>>> import time
>>> iterable = list(range(10))
>>> results = []
>>> try:
... for i, result in enumerate(imap(time.sleep, iterable, processes=2, timeout=2)):
... results.append(result)
... except mp.TimeoutError:
... print("Failed to process the following subset of iterable:", iterable[i:])
Failed to process the following subset of iterable: [2, 3, 4, 5, 6, 7, 8, 9]