Python: concurrent.futures Как сделать его отменяемым?

Python concurrent.futures и ProcessPoolExecutor предоставляют удобный интерфейс для планирования и мониторинга задач. Фьючерсы даже предоставляют метод.cancel ():

отмена (): попытка отменить вызов. Если вызов в данный момент выполняется и не может быть отменен, метод вернет значение False, в противном случае вызов будет отменен, а метод вернет значение True.

К сожалению, в простом вопросе (касающемся asyncio) в ответе утверждается, что выполняемые задачи невозможно отменить с помощью этого фрагмента документации, но в документах об этом не говорится, только если они выполняются и не могут быть отменены.

Отправка multiprocessing.Events в процессы также не является тривиально возможной (это делается с помощью параметров, как в multiprocess.Process возвращает RuntimeError)

Что я пытаюсь сделать? Я хотел бы разбить пространство поиска и запустить задачу для каждого раздела. Но для этого достаточно ОДНОГО решения, и этот процесс требует значительных ресурсов процессора. Так есть ли действительно удобный способ сделать это, который не компенсирует выгоды с помощью ProcessPool для начала?

Пример:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))

4 ответа

К сожалению, работает Futures не может быть отменено Я полагаю, что основная причина заключается в том, чтобы обеспечить один и тот же API в разных реализациях (невозможно прервать выполнение потоков или сопрограмм).

Библиотека Pebble была разработана для преодоления этого и других ограничений.

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  

Я не знаю почему concurrent.futures.Future не имеет .kill() метод, но вы можете достичь того, что вы хотите, закрыв пул процессов с pool.shutdown(wait=False)и уничтожение оставшихся дочерних процессов вручную.

Создайте функцию для уничтожения дочерних процессов:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)

Запускайте свой код, пока не получите первый результат, а затем убейте все оставшиеся дочерние процессы:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())

Я нашел ваш вопрос интересным, так что вот мой вывод.

Я нашел поведение .cancel() метод, как указано в документации по Python. Что касается выполняемых вами параллельных функций, к сожалению, их нельзя было отменить даже после того, как им было сказано это сделать. Если мои выводы верны, то я считаю, что Python требует более эффективного метода.cancel().

Запустите код ниже, чтобы проверить мои выводы.

from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time 

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 3351355150:
            return elem
            break #Added to terminate loop once found
    return False

start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    ### New Code: Start ### 
    for f in as_completed(futures):
        print(f.result())
        if f.result():
            print('break')
            break

    for f in futures:
        print(f, 'running?',f.running())
        if f.running():
            f.cancel()
            print('Cancelled? ',f.cancelled())

    print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )

Обновление: можно принудительно завершить параллельные процессы через bash, но в результате основная программа на python также будет завершена. Если это не проблема для вас, попробуйте следующий код.

Вы должны добавить приведенные ниже коды между двумя последними операторами печати, чтобы убедиться в этом. Примечание. Этот код работает только в том случае, если вы не запускаете какую-либо другую программу на python3.

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
    print('PID = ', i)
    if i != result[0]:
        os.kill(int(i), signal.SIGKILL)
        try: 
           os.kill(int(i), 0)
           raise Exception("""wasn't able to kill the process 
                              HINT:use signal.SIGKILL or signal.SIGABORT""")
        except OSError as ex:
           continue

Для одного из моих программ мне нужно было убить все запущенные процессы моего исполнителя при вызове.shutdown()метод. Как это было невозможно со стандартомProcessPoolExecutor, я сделал это сам. Я также исправил неправильное состояние выполнения, сообщаемое будущими объектами:

      import sys
import functools
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures._base import RUNNING


def _callable_wrapper(is_running, fn, *args, **kwargs):
    is_running.value = True
    fn(*args, **kwargs)
    is_running.value = False


def _future_running_override(future, is_running):
    return future._state == RUNNING and is_running.value


class StoppableProcessPoolExecutor(ProcessPoolExecutor):
    """A concurrent.futures.ProcessPoolExecutor that kills running processes on
    shutdown.
    This also fix the wrong running state of futures. See
    https://bugs.python.org/issue37276
    """

    def __init__(self, *args, **kwargs):
        self._state_manager = multiprocessing.Manager()
        ProcessPoolExecutor.__init__(self, *args, **kwargs)

    def shutdown(self, *args, **kwargs):
        processes = self._processes

        # Python < 3.9: We should wait else we got an OSError:
        # https://bugs.python.org/issue36281
        if sys.version_info.major >= 3 and sys.version_info.minor < 9:
            kwargs["wait"] = True

        for pid, process in processes.items():
            process.kill()
        ProcessPoolExecutor.shutdown(self, *args, **kwargs)
        self._state_manager.shutdown()

    shutdown.__doc__ = ProcessPoolExecutor.shutdown.__doc__

    def submit(self, fn, *args, **kwargs):
        is_running = self._state_manager.Value(bool, False)
        future = ProcessPoolExecutor.submit(
            self,
            functools.partial(_callable_wrapper, is_running, fn),
            *args,
            **kwargs,
        )
        # Monkey patch future.running to return the real running state
        future.running = functools.partial(_future_running_override, future, is_running)
        return future

    submit.__doc__ = ProcessPoolExecutor.submit.__doc__

Оригинальный источник: https://github.com/flozz/yoga-image-optimizer/blob/master/yoga_image_optimizer/stoppable_process_pool_executor.py

Если это может помочь кому-то... :)

Другие вопросы по тегам