Как отключить длительно работающую программу с использованием rxpython?
Скажите, у меня есть долго работающая функция Python, которая выглядит примерно так?
import random
import time
from rx import Observable
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x
Я хочу иметь возможность установить тайм-аут 1000ms
,
Итак, я что-то вроде, создаю наблюдаемое и отображаю его с помощью вышеупомянутого интенсивного расчета.
a = Observable.repeat(1).map(lambda x: intns(x))
Теперь для каждого испускаемого значения, если оно занимает более 1000 мс, я хочу завершить наблюдаемое, как только достигну 1000ms
с помощью on_error
или же on_completed
a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))
вышеупомянутое утверждение получает тайм-аут, и вызывает on_error
, но он продолжает завершать вычисление интенсивного расчета и только затем возвращается к следующим утверждениям. Есть ли лучший способ сделать это?
Последнее утверждение печатает следующее
8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends
Идея состоит в том, что, если время ожидания определенной функции истекло, я хочу продолжить работу с моей программой и игнорировать результат.
Мне было интересно, если intns
функция была выполнена в отдельном потоке, я думаю, что основной поток продолжает выполняться после истечения времени ожидания, но я все еще хочу прекратить вычисления intns
функционировать в потоке, или убить его как-нибудь.
5 ответов
Ниже приведен класс, который можно вызвать с помощью with timeout() :
Если блок под кодом выполняется дольше указанного времени, TimeoutError
Поднялся.
import signal
class timeout:
# Default value is 1 second (1000ms)
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
# example usage
with timeout() :
# infinite while loop so timeout is reached
while True :
pass
Если я понимаю вашу функцию, вот как будет выглядеть ваша реализация:
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
with timeout() :
time.sleep(y)
print('end')
return x
Вот пример для тайм-аута
import random
import time
import threading
_timeout = 0
def intns(loops=1):
print('begin')
processing = 0
for i in range(loops):
y = random.randint(5,10)
time.sleep(y)
if _timeout == 1:
print('timedout end')
return
print('keep processing')
return
# this will timeout
timeout_seconds = 10
loops = 10
# this will complete
#timeout_seconds = 30.0
#loops = 1
thr = threading.Thread(target=intns, args=([loops]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
if(time.clock() - st > timeout_seconds):
_timeout = 1
thr.join()
if _timeout == 0:
print ("completed")
else:
print ("timed-out")
Вы можете сделать это частично, используя многопоточность Хотя в python нет конкретного способа уничтожить нить, вы можете реализовать метод, помечающий нить как завершающую.
Это не будет работать, если поток ожидает других ресурсов (в вашем случае вы смоделировали "длинный" код при случайном ожидании)
Смотрите также Есть ли способ убить поток в Python?
Вот так это работает:
import random
import time
import threading
import os
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x
thr = threading.Thread(target=intns, args=([10]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
if(time.clock() - st > 9):
os._exit(0)
Вы можете использовать time.sleep() и сделать цикл while для time.clock()