Как ограничить время выполнения вызова функции в Python

В моем коде есть вызов функции, связанной с сокетом, эта функция из другого модуля, таким образом, вне моего контроля, проблема в том, что она иногда блокируется на несколько часов, что совершенно неприемлемо. Как я могу ограничить время выполнения функции из своего кода? Я думаю, что решение должно использовать другой поток.

13 ответов

Решение

Я не уверен, насколько кроссплатформенным это может быть, но использование сигналов и сигналов тревоги может быть хорошим способом посмотреть на это. Немного поработав, вы можете сделать это полностью универсальным и пригодным для использования в любой ситуации.

http://docs.python.org/library/signal.html

Итак, ваш код будет выглядеть примерно так.

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"

Улучшение ответа @rik.the.vik будет заключаться в использовании with заявление, чтобы дать функции времени ожидания некоторый синтаксический сахар:

import signal
from contextlib import contextmanager

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


try:
    with time_limit(10):
        long_function_call()
except TimeoutException as e:
    print("Timed out!")

Вот способ Linux/OSX ограничить время выполнения функции. Это в том случае, если вы не хотите использовать потоки и хотите, чтобы ваша программа ожидала завершения функции или истечения срока.

from multiprocessing import Process
from time import sleep

def f(time):
    sleep(time)


def run_with_limited_time(func, args, kwargs, time):
    """Runs a function with time limit

    :param func: The function to run
    :param args: The functions args, given as tuple
    :param kwargs: The functions keywords, given as dict
    :param time: The time limit in seconds
    :return: True if the function ended successfully. False if it was terminated.
    """
    p = Process(target=func, args=args, kwargs=kwargs)
    p.start()
    p.join(time)
    if p.is_alive():
        p.terminate()
        return False

    return True


if __name__ == '__main__':
    print run_with_limited_time(f, (1.5, ), {}, 2.5) # True
    print run_with_limited_time(f, (3.5, ), {}, 2.5) # False

Я предпочитаю подход контекстного менеджера, потому что он позволяет выполнять несколько операторов Python в пределах with time_limit заявление. Потому что система Windows не имеет SIGALARM, более портативный и, возможно, более простой метод мог бы использовать Timer

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception):
    def __init__(self, msg=''):
        self.msg = msg

@contextmanager
def time_limit(seconds, msg=''):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        raise TimeoutException("Timed out for operation {}".format(msg))
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
    for i in range(10):
        time.sleep(1)

# this will actually end after 10 seconds
with time_limit(5, 'sleep'):
    time.sleep(10)

Ключевой техникой здесь является использование _thread.interrupt_main прервать основной поток из потока таймера. Одно предостережение заключается в том, что основной поток не всегда реагирует на KeyboardInterrupt поднятые Timer быстро. Например, time.sleep() вызывает системную функцию так KeyboardInterrupt будет обработан после sleep вызов.

Вот: простой способ получить желаемый эффект:

https://pypi.org/project/func-timeout

Это спасло мне жизнь.

А теперь пример того, как это работает: скажем, у вас есть огромный список элементов, которые нужно обработать, и вы выполняете итерацию своей функции по этим элементам. Однако по какой-то странной причине ваша функция застревает на элементе n, не вызывая исключения. Вам нужно обработать другие предметы, чем больше, тем лучше. В этом случае вы можете установить тайм-аут для обработки каждого элемента:

      import time
import func_timeout


def my_function(n):
    """Sleep for n seconds and return n squared."""
    print(f'Processing {n}')
    time.sleep(n)
    return n**2


def main_controller(max_wait_time, all_data):
    """
    Feed my_function with a list of itens to process (all_data).

    However, if max_wait_time is exceeded, return the item and a fail info.
    """
    res = []
    for data in all_data:
        try:
            my_square = func_timeout.func_timeout(
                max_wait_time, my_function, args=[data]
                )
            res.append((my_square, 'processed'))
        except func_timeout.FunctionTimedOut:
            print('error')
            res.append((data, 'fail'))
            continue

    return res


timeout_time = 2.1  # my time limit
all_data = [i for i in range(1, 10)]  # the data to be processed

res = main_controller(timeout_time, all_data)
print(res)

Делать это из обработчика сигнала опасно: вы можете находиться внутри обработчика исключений в тот момент, когда возникает исключение, и оставлять вещи в неисправном состоянии. Например,

def function_with_enforced_timeout():
  f = open_temporary_file()
  try:
   ...
  finally:
   here()
   unlink(f.filename)

Если ваше исключение возникает здесь (), временный файл никогда не будет удален.

Решение здесь заключается в том, чтобы асинхронные исключения откладывались до тех пор, пока код не окажется внутри кода обработки исключений (исключение или окончание блока), но Python этого не делает.

Обратите внимание, что это ничего не прерывает во время выполнения нативного кода; он прервет его только когда функция вернется, так что это может не помочь в этом конкретном случае. (Сам SIGALRM может прервать блокирующий вызов, но код сокета обычно просто повторяется после EINTR.)

Делать это с потоками - лучшая идея, поскольку она более переносима, чем сигналы. Так как вы запускаете рабочий поток и блокируете его до конца, обычные проблемы параллелизма отсутствуют. К сожалению, нет способа доставить исключение асинхронно другому потоку в Python (другие API потока могут сделать это). У него также будет та же проблема с отправкой исключения во время обработчика исключения, и потребуется такое же исправление.

Единственный "безопасный" способ сделать это на любом языке - это использовать вторичный процесс для выполнения этого тайм-аута, в противном случае вам нужно построить свой код таким образом, чтобы он сам по себе был безопасным, например, путем проверка времени, прошедшего в цикле или аналогичном. Если изменение метода не вариант, потока не будет достаточно.

Зачем? Потому что вы рискуете оставить что-то в плохом состоянии. Если поток просто прерван в середине метода, удержания блокировок и т. Д. Будут просто удерживаться и не могут быть освобождены.

Так что смотрите на путь процесса, не смотрите на путь потока.

Вам не нужно использовать темы. Вы можете использовать другой процесс для блокировки, например, используя модуль подпроцесса. Если вы хотите обмениваться структурами данных между различными частями вашей программы, тогда Twisted - это отличная библиотека для того, чтобы вы могли сами это контролировать, и я бы порекомендовал ее, если вы заботитесь о блокировке и ожидаете, что у вас будут много проблем. Плохая новость с Twisted заключается в том, что вы должны переписать свой код, чтобы избежать каких-либо блокировок.

Вы можете использовать потоки, чтобы избежать блокировки, но я бы расценил это как последнее средство, так как это подвергает вас целому миру боли. Прочитайте хорошую книгу о параллелизме, прежде чем даже думать об использовании потоков в производстве, например, "Параллельные системы" Жана Бэкона. Я работаю с группой людей, которые действительно крутят высокопроизводительные вещи с помощью потоков, и мы не внедряем потоки в проекты, если они нам действительно не нужны.

Я обычно предпочитаю использовать контекстный менеджер, как предложено @josh-lee

Но в случае, если кто-то заинтересован, чтобы это было реализовано в качестве декоратора, вот альтернатива.

Вот как это будет выглядеть:

import time
from timeout import timeout

class Test(object):
    @timeout(2)
    def test_a(self, foo, bar):
        print foo
        time.sleep(1)
        print bar
        return 'A Done'

    @timeout(2)
    def test_b(self, foo, bar):
        print foo
        time.sleep(3)
        print bar
        return 'B Done'

t = Test()
print t.test_a('python', 'rocks')
print t.test_b('timing', 'out')

И это timeout.py модуль:

import threading

class TimeoutError(Exception):
    pass

class InterruptableThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        threading.Thread.__init__(self)
        self._func = func
        self._args = args
        self._kwargs = kwargs
        self._result = None

    def run(self):
        self._result = self._func(*self._args, **self._kwargs)

    @property
    def result(self):
        return self._result


class timeout(object):
    def __init__(self, sec):
        self._sec = sec

    def __call__(self, f):
        def wrapped_f(*args, **kwargs):
            it = InterruptableThread(f, *args, **kwargs)
            it.start()
            it.join(self._sec)
            if not it.is_alive():
                return it.result
            raise TimeoutError('execution expired')
        return wrapped_f

Выход:

python
rocks
A Done
timing
Traceback (most recent call last):
  ...
timeout.TimeoutError: execution expired
out

Обратите внимание, что даже если TimeoutError брошенный, декорированный метод будет продолжать работать в другом потоке. Если вы также хотите, чтобы этот поток был "остановлен", посмотрите: есть ли способ убить поток в Python?

Использование простого декоратора

Вот версия, которую я сделал после изучения приведенных выше ответов. Довольно прямолинейно.

      def function_timeout(seconds: int):
    """Wrapper of Decorator to pass arguments"""

    def decorator(func):
        @contextmanager
        def time_limit(seconds_):
            def signal_handler(signum, frame):  # noqa
                raise TimeoutException(f"Timed out in {seconds_} seconds!")

            signal.signal(signal.SIGALRM, signal_handler)
            signal.alarm(seconds_)
            try:
                yield
            finally:
                signal.alarm(0)

        @wraps(func)
        def wrapper(*args, **kwargs):
            with time_limit(seconds):
                return func(*args, **kwargs)

        return wrapper

    return decorator

Как использовать?

      @function_timeout(seconds=5)
def my_naughty_function():
    while True:
        print("Try to stop me ;-p")

Ну и конечно не забудьте импортировать функцию, если она находится в отдельном файле.

Вот функция тайм-аута, я думаю, я нашел через Google, и она работает для меня.

От: http://code.activestate.com/recipes/473878/

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    '''This function will spwan a thread and run the given function using the args, kwargs and 
    return the given default value if the timeout_duration is exceeded 
    ''' 
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            try:
                self.result = func(*args, **kwargs)
            except:
                self.result = default
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return it.result
    else:
        return it.result   

Этот код работает для Windows Server Datacenter 2016 с python 3.7.3, и я не тестировал его на Unix, после смешивания некоторых ответов от Google и StackOverflow он наконец сработал для меня следующим образом:

      from multiprocessing import Process, Lock
import time
import os

def f(lock,id,sleepTime):
    lock.acquire()
    print("I'm P"+str(id)+" Process ID: "+str(os.getpid()))
    lock.release()
    time.sleep(sleepTime)   #sleeps for some time
    print("Process: "+str(id)+" took this much time:"+str(sleepTime))
    time.sleep(sleepTime)
    print("Process: "+str(id)+" took this much time:"+str(sleepTime*2))

if __name__ == '__main__':
    timeout_function=float(9) # 9 seconds for max function time
    print("Main Process ID: "+str(os.getpid()))
    lock=Lock()
    p1=Process(target=f, args=(lock,1,6,))   #Here you can change from 6 to 3 for instance, so you can watch the behavior
    start=time.time()
    print(type(start))
    p1.start()
    if p1.is_alive():
        print("process running a")
    else:
        print("process not running a")
    while p1.is_alive():
        timeout=time.time()
        if timeout-start > timeout_function:
            p1.terminate()
            print("process terminated")
        print("watching, time passed: "+str(timeout-start) )
        time.sleep(1)
    if p1.is_alive():
        print("process running b")
    else:
        print("process not running b")
    p1.join()
    if p1.is_alive():
        print("process running c")
    else:
        print("process not running c")
    end=time.time()
    print("I am the main process, the two processes are done")
    print("Time taken:- "+str(end-start)+" secs")   #MainProcess terminates at approx ~ 5 secs.
    time.sleep(5) # To see if on Task Manager the child process is really being terminated, and it is
    print("finishing")

Основной код взят из этой ссылки:Создайте два дочерних процесса с помощью python (windows)

Затем я использовал .terminate()убить дочерний процесс. Вы можете видеть, что функция f вызывает 2 вывода, один через 5 секунд, а другой через 10 секунд. Однако при 7-секундном спящем режиме и методе terminate() последний отпечаток не отображается.

Это сработало для меня, надеюсь, это поможет!

Метод из @user2283347 протестирован работающим, но мы хотим избавиться от сообщений трассировки. Используйте трюк передачи из Удалить трассировку в Python на Ctrl-C, измененный код:

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        pass     
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

def timeout_svm_score(i):
     #from sklearn import svm
     #import numpy as np
     #from IPython.core.display import display
     #%store -r names X Y
     clf = svm.SVC(kernel='linear', C=1).fit(np.nan_to_num(X[[names[i]]]), Y)
     score = clf.score(np.nan_to_num(X[[names[i]]]),Y)
     #scoressvm.append((score, names[i]))
     display((score, names[i])) 
     
%%time
with time_limit(5):
    i=0
    timeout_svm_score(i)
#Wall time: 14.2 s

%%time
with time_limit(20):
    i=0
    timeout_svm_score(i)
#(0.04541284403669725, '计划飞行时间')
#Wall time: 16.1 s

%%time
with time_limit(5):
    i=14
    timeout_svm_score(i)
#Wall time: 5h 43min 41s

Мы видим, что этому методу может потребоваться много времени, чтобы прервать расчет, мы просили 5 секунд, но он работает через 5 часов.

Другие вопросы по тегам