Python: concurrent.futures Как сделать его отменяемым?
Python concurrent.futures и ProcessPoolExecutor предоставляют удобный интерфейс для планирования и мониторинга задач. Фьючерсы даже предоставляют метод.cancel ():
отмена (): попытка отменить вызов. Если вызов в данный момент выполняется и не может быть отменен, метод вернет значение False, в противном случае вызов будет отменен, а метод вернет значение True.
К сожалению, в простом вопросе (касающемся asyncio) в ответе утверждается, что выполняемые задачи невозможно отменить с помощью этого фрагмента документации, но в документах об этом не говорится, только если они выполняются и не могут быть отменены.
Отправка multiprocessing.Events в процессы также не является тривиально возможной (это делается с помощью параметров, как в multiprocess.Process возвращает RuntimeError)
Что я пытаюсь сделать? Я хотел бы разбить пространство поиска и запустить задачу для каждого раздела. Но для этого достаточно ОДНОГО решения, и этот процесс требует значительных ресурсов процессора. Так есть ли действительно удобный способ сделать это, который не компенсирует выгоды с помощью ProcessPool для начала?
Пример:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
for d in done:
print(d.result())
print("---")
for d in not_done:
# will return false for Cancel and Result for all futures
print("Cancel: "+str(d.cancel()))
print("Result: "+str(d.result()))
4 ответа
К сожалению, работает Futures
не может быть отменено Я полагаю, что основная причина заключается в том, чтобы обеспечить один и тот же API в разных реализациях (невозможно прервать выполнение потоков или сопрограмм).
Библиотека Pebble была разработана для преодоления этого и других ограничений.
from pebble import ProcessPool
def function(foo, bar=0):
return foo + bar
with ProcessPool() as pool:
future = pool.schedule(function, args=[1])
# if running, the container process will be terminated
# a new process will be started consuming the next task
future.cancel()
Я не знаю почему concurrent.futures.Future
не имеет .kill()
метод, но вы можете достичь того, что вы хотите, закрыв пул процессов с pool.shutdown(wait=False)
и уничтожение оставшихся дочерних процессов вручную.
Создайте функцию для уничтожения дочерних процессов:
import signal, psutil
def kill_child_processes(parent_pid, sig=signal.SIGTERM):
try:
parent = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = parent.children(recursive=True)
for process in children:
process.send_signal(sig)
Запускайте свой код, пока не получите первый результат, а затем убейте все оставшиеся дочерние процессы:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)
# Shut down pool
pool.shutdown(wait=False)
# Kill remaining child processes
kill_child_processes(os.getpid())
Я нашел ваш вопрос интересным, так что вот мой вывод.
Я нашел поведение .cancel()
метод, как указано в документации по Python. Что касается выполняемых вами параллельных функций, к сожалению, их нельзя было отменить даже после того, как им было сказано это сделать. Если мои выводы верны, то я считаю, что Python требует более эффективного метода.cancel().
Запустите код ниже, чтобы проверить мои выводы.
from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 3351355150:
return elem
break #Added to terminate loop once found
return False
start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
### New Code: Start ###
for f in as_completed(futures):
print(f.result())
if f.result():
print('break')
break
for f in futures:
print(f, 'running?',f.running())
if f.running():
f.cancel()
print('Cancelled? ',f.cancelled())
print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )
Обновление: можно принудительно завершить параллельные процессы через bash, но в результате основная программа на python также будет завершена. Если это не проблема для вас, попробуйте следующий код.
Вы должны добавить приведенные ниже коды между двумя последними операторами печати, чтобы убедиться в этом. Примечание. Этот код работает только в том случае, если вы не запускаете какую-либо другую программу на python3.
import subprocess, os, signal
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
print('PID = ', i)
if i != result[0]:
os.kill(int(i), signal.SIGKILL)
try:
os.kill(int(i), 0)
raise Exception("""wasn't able to kill the process
HINT:use signal.SIGKILL or signal.SIGABORT""")
except OSError as ex:
continue
Для одного из моих программ мне нужно было убить все запущенные процессы моего исполнителя при вызове.shutdown()
метод. Как это было невозможно со стандартомProcessPoolExecutor
, я сделал это сам. Я также исправил неправильное состояние выполнения, сообщаемое будущими объектами:
import sys
import functools
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures._base import RUNNING
def _callable_wrapper(is_running, fn, *args, **kwargs):
is_running.value = True
fn(*args, **kwargs)
is_running.value = False
def _future_running_override(future, is_running):
return future._state == RUNNING and is_running.value
class StoppableProcessPoolExecutor(ProcessPoolExecutor):
"""A concurrent.futures.ProcessPoolExecutor that kills running processes on
shutdown.
This also fix the wrong running state of futures. See
https://bugs.python.org/issue37276
"""
def __init__(self, *args, **kwargs):
self._state_manager = multiprocessing.Manager()
ProcessPoolExecutor.__init__(self, *args, **kwargs)
def shutdown(self, *args, **kwargs):
processes = self._processes
# Python < 3.9: We should wait else we got an OSError:
# https://bugs.python.org/issue36281
if sys.version_info.major >= 3 and sys.version_info.minor < 9:
kwargs["wait"] = True
for pid, process in processes.items():
process.kill()
ProcessPoolExecutor.shutdown(self, *args, **kwargs)
self._state_manager.shutdown()
shutdown.__doc__ = ProcessPoolExecutor.shutdown.__doc__
def submit(self, fn, *args, **kwargs):
is_running = self._state_manager.Value(bool, False)
future = ProcessPoolExecutor.submit(
self,
functools.partial(_callable_wrapper, is_running, fn),
*args,
**kwargs,
)
# Monkey patch future.running to return the real running state
future.running = functools.partial(_future_running_override, future, is_running)
return future
submit.__doc__ = ProcessPoolExecutor.submit.__doc__
Оригинальный источник: https://github.com/flozz/yoga-image-optimizer/blob/master/yoga_image_optimizer/stoppable_process_pool_executor.py
Если это может помочь кому-то... :)