Менеджер контекста времени ожидания Python с потоками
Я имею timeout
менеджер контекста, который отлично работает с сигналами, но вызывает ошибку в многопоточном режиме, потому что сигналы работают только в основном потоке.
def timeout_handler(signum, frame):
raise TimeoutException()
@contextmanager
def timeout(seconds):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
Я видел реализацию декоратора timeout
но я не знаю, как пройти yield
внутри класса, полученного из threading.Thread
, Мой вариант не сработает.
@contextmanager
def timelimit(seconds):
class FuncThread(threading.Thread):
def run(self):
yield
it = FuncThread()
it.start()
it.join(seconds)
if it.isAlive():
raise TimeoutException()
5 ответов
Если код, защищенный менеджером контекста, основан на цикле, подумайте о том, чтобы обрабатывать это так, как люди обрабатывают уничтожение потоков. Уничтожение другого потока, как правило, небезопасно, поэтому стандартный подход заключается в том, чтобы управляющий поток установил флаг, видимый для рабочего потока. Рабочий поток периодически проверяет этот флаг и автоматически отключается. Вот как вы можете сделать что-то аналогичное с таймаутами:
class timeout(object):
def __init__(self, seconds):
self.seconds = seconds
def __enter__(self):
self.die_after = time.time() + self.seconds
return self
def __exit__(self, type, value, traceback):
pass
@property
def timed_out(self):
return time.time() > self.die_after
Вот пример однопоточного использования:
with timeout(1) as t:
while True: # this will take a long time without a timeout
# periodically check for timeouts
if t.timed_out:
break # or raise an exception
# do some "useful" work
print "."
time.sleep(0.2)
и многопоточный:
import thread
def print_for_n_secs(string, seconds):
with timeout(seconds) as t:
while True:
if t.timed_out:
break # or raise an exception
print string,
time.sleep(0.5)
for i in xrange(5):
thread.start_new_thread(print_for_n_secs,
('thread%d' % (i,), 2))
time.sleep(0.25)
Этот подход более навязчив, чем использование сигналов, но он работает для произвольных потоков.
Я не вижу способа сделать то, что вы предлагаете, с помощью контекстного менеджера, вы не можете yield
поток из одного потока в другой. То, что я хотел бы сделать, это обернуть вашу функцию прерывистым потоком с таймаутом. Вот рецепт для этого.
У вас будет дополнительный поток, и синтаксис будет не таким хорошим, но он будет работать.
Я знаю, что уже поздно, но я только сейчас читаю это, но как насчет создания вашего собственного сигнализатора / менеджера контекста? Я новичок в Python будет любить отзывы опытных разработчиков этой реализации.
Это основано на ответе от "Мистер Фуз"
class TimeoutSignaller(Thread):
def __init__(self, limit, handler):
Thread.__init__(self)
self.limit = limit
self.running = True
self.handler = handler
assert callable(handler), "Timeout Handler needs to be a method"
def run(self):
timeout_limit = datetime.datetime.now() + datetime.timedelta(seconds=self.limit)
while self.running:
if datetime.datetime.now() >= timeout_limit:
self.handler()
self.stop_run()
break
def stop_run(self):
self.running = False
class ProcessContextManager:
def __init__(self, process, seconds=0, minutes=0, hours=0):
self.seconds = (hours * 3600) + (minutes * 60) + seconds
self.process = process
self.signal = TimeoutSignaller(self.seconds, self.signal_handler)
def __enter__(self):
self.signal.start()
return self.process
def __exit__(self, exc_type, exc_val, exc_tb):
self.signal.stop_run()
def signal_handler(self):
# Make process terminate however you like
# using self.process reference
raise TimeoutError("Process took too long to execute")
Случай использования:
with ProcessContextManager(my_proc) as p:
# do stuff e.g.
p.execute()
Реализация похожа на Mr Fooz, но с использованиемcontextlib
библиотека:
import time
from contextlib import contextmanager
@contextmanager
def timeout(seconds):
"""
A simple context manager to enable timeouts.
Example:
with timeout(5) as t:
while True:
if t():
# handle
"""
stop = time.time() + seconds
def timed_out():
return time.time() > stop
yield timed_out
Тайм-ауты для системных вызовов выполняются с помощью сигналов. Большинство блокирующих системных вызовов возвращаются с EINTR при возникновении сигнала, поэтому вы можете использовать будильник для реализации тайм-аутов.
Вот менеджер контекста, который работает с большинством системных вызовов, вызывая IOError из системного вызова блокировки, если это занимает слишком много времени.
import signal, errno
from contextlib import contextmanager
import fcntl
@contextmanager
def timeout(seconds):
def timeout_handler(signum, frame):
pass
original_handler = signal.signal(signal.SIGALRM, timeout_handler)
try:
signal.alarm(seconds)
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)
with timeout(1):
f = open("test.lck", "w")
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
except IOError, e:
if e.errno != errno.EINTR:
raise e
print "Lock timed out"