Прерывания клавиатуры с помощью многопроцессорного пула Python
Как мне обработать события KeyboardInterrupt с помощью многопроцессорных пулов Python? Вот простой пример:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
При запуске кода выше, KeyboardInterrupt
поднимается, когда я нажимаю ^C
, но процесс просто зависает в этот момент, и я должен убить его извне.
Я хочу быть в состоянии нажать ^C
в любое время и изящно завершить все процессы.
11 ответов
Это ошибка Python. При ожидании условия в threading.Condition.wait() KeyboardInterrupt никогда не отправляется. Репро:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
Исключение KeyboardInterrupt не будет доставлено до тех пор, пока wait () не вернется, и никогда не вернется, поэтому прерывание никогда не происходит. KeyboardInterrupt почти наверняка должен прервать ожидание условия.
Обратите внимание, что это не происходит, если указан тайм-аут; cond.wait(1) немедленно получит прерывание. Таким образом, обходной путь должен указать время ожидания. Для этого замените
results = pool.map(slowly_square, range(40))
с
results = pool.map_async(slowly_square, range(40)).get(9999999)
или похожие.
Из того, что я недавно обнаружил, лучшее решение - настроить рабочие процессы на полное игнорирование SIGINT и ограничить весь код очистки родительским процессом. Это устраняет проблему как для незанятых, так и для занятых рабочих процессов и не требует кода обработки ошибок в ваших дочерних процессах.
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
Объяснение и полный пример кода можно найти по адресу http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ и http://github.com/jreese/multiprocessing-keyboardinterrupt соответственно.
По некоторым причинам только исключения унаследованы от базовой Exception
класс обрабатываются нормально. В качестве обходного пути, вы можете повторно поднять свой KeyboardInterrupt
как Exception
пример:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
Обычно вы получите следующий вывод:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
Так что, если вы нажмете ^C
, ты получишь:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Голосованный ответ не решает основную проблему, но похожий побочный эффект.
Джесси Ноллер, автор многопроцессорной библиотеки, объясняет, как правильно обращаться с CTRL+C при использовании multiprocessing.Pool
в старом сообщении в блоге.
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
Многие из этих ответов устарели и / или они, похоже, не работают с более поздними версиями Python (я использую 3.8.5) в Windows, если вы выполняете такой метод, как, который блокируется до тех пор, пока все отправленные задачи не будут завершены . Вот мое решение.
Позвоните в
в каждом процессе в пуле процессов, чтобы полностью игнорировать прерывание и оставить обработку главному процессу. Метод использования
(или ) вместо который лениво оценивает ваш повторяемый аргумент для отправки задач и обработки результатов. Таким образом, он (а) не блокирует ожидание всех результатов, и в качестве побочного преимущества (б) вы можете сэкономить память, так как итерируемый объект теперь может быть функцией или выражением генератора. Ключ состоит в том, чтобы основной процесс периодически и часто выпускал распечатанные отчеты, например, сообщая о ходе выполнения представленных задач. Это необходимо для распознавания прерывания клавиатуры. В приведенном ниже коде счетчик количества выполненных задач печатается по мере выполнения каждых дополнительных N задач, где N равно 100. Идея состоит в том, чтобы выбрать N на основе вашей индивидуальной рабочей функции, чтобы сообщение о количестве выполненных задач печаталось достаточно часто, чтобы что вам не нужно слишком долго ждать, пока прерывание вступит в силу после нажатия Ctrl-c. Конечно, вы также можете использовать индикатор выполнения, такой как тот, который предоставляется
пакет доступен на репозиторий.
from multiprocessing import Pool
import signal
def init_pool():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def worker(x):
import time
# processes the number
time.sleep(.2)
if __name__ == '__main__':
with Pool(initializer=init_pool) as pool:
try:
tasks_completed = 0
result = []
for return_value in pool.imap(worker, range(1000)):
tasks_completed += 1
if tasks_completed % 10 == 0:
print('Tasks completed =', tasks_completed, end='\r')
result.append(return_value)
except KeyboardInterrupt:
print('\nCtrl-c entered.')
else:
print()
Обычно эта простая структура работает для Ctrl-C в пуле:
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
Как было сказано в нескольких похожих постах:
Я новичок в Python. Я всюду искал ответ и наткнулся на этот и несколько других блогов и видео на YouTube. Я попытался скопировать вставить код автора выше и воспроизвести его на моем Python 2.7.13 в Windows 7 64-битной. Это близко к тому, чего я хочу достичь.
Я заставил свои дочерние процессы игнорировать ControlC и заставить родительский процесс завершаться. Похоже, что обход дочернего процесса помогает избежать этой проблемы для меня.
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
Часть начинается с pool.terminate()
никогда не кажется выполненным.
Кажется, есть две проблемы, которые делают исключения при раздражающей многопроцессорности. Первое (отмечено Гленном) - это то, что вам нужно использовать map_async
с таймаутом вместо map
чтобы получить немедленный ответ (т. е. не заканчивать обработку всего списка). Второе (замечено Андреем) состоит в том, что многопроцессорность не перехватывает исключения, которые не наследуются от Exception
(например, SystemExit
). Итак, вот мое решение, которое касается обоих из них:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackru.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
Я обнаружил, что на данный момент лучшее решение - не использовать функцию multiprocessing.pool, а использовать собственную функцию пула. Я представил пример, демонстрирующий ошибку с apply_async, а также пример, показывающий, как вообще не использовать функциональность пула.
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
Вы можете попробовать использовать метод apply_async объекта Pool, например так:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
Выход:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
Преимущество этого метода в том, что результаты, обработанные до прерывания, будут возвращены в словарь результатов:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
Как ни странно, похоже, что вы должны справиться с KeyboardInterrupt
у детей тоже. Я бы ожидал, что это будет работать как написано... попробуйте изменить slowly_square
чтобы:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
Это должно работать, как вы ожидали.