Как я могу получить эквивалент Python threading.Timer, который использует время работы системы?

TL; DR threading.Timer использует системное время, но время меняется, пока я его использую, как я могу использовать его во время работы системы?

У меня есть сценарий Python, который выполняет кучу вещей, один из которых устанавливает системное время. Когда этот скрипт запускается, время неверно. Этот сценарий также должен иметь глобальное время ожидания 30 секунд.

Я использовал следующий класс времени ожидания:

class Timeout(object):
    def __init__(self, seconds=1, signum=signal.SIGUSR1, exception=TimeoutException):
        self.exception = exception
        self.pid = os.getpid()
        self.signum = signum
        self.timer = threading.Timer(seconds, self.exit_function)

    def exit_function(self):
        os.kill(self.pid, self.signum)

    def handle_timeout(self, signum, frame):
        raise self.exception()

    def __enter__(self):
        signal.signal(self.signum, self.handle_timeout)
        self.timer.start()

    def __exit__(self, type, value, traceback):
        self.timer.cancel()

Который оборачивает весь мой сценарий:

with Timeout(seconds=30):
    main()

иногда сценарий очень быстро завершается неудачей или его не убивают через 30 секунд. Я считаю, что это потому, что threading.Timer использует системное время, которое изменяется во время работы скрипта. Могу ли я заставить его использовать систему безотказной работы?

2 ответа

Решение

Я закончил расширение threading.Timer использовать систему безотказной работы.

class Timer(threading._Timer):

    def __init__(self, *args, **kwargs):
        super(Timer, self).__init__(*args, **kwargs)

        # only works on Linux
        self._libc = ctypes.CDLL('libc.so.6')
        self._buf = ctypes.create_string_buffer(128)

    def uptime(self):
        self._libc.sysinfo(self._buf)
        return struct.unpack_from('@l', self._buf.raw)[0]

    def run(self):
        start_time = self.uptime()
        while not self.finished.is_set():
            time.sleep(0.1)
            if self.uptime() - start_time > self.interval:
                self.function(*self.args, **self.kwargs)
                break
        self.finished.set()

Похоже, вы используете Python < 3.3. На Python 3.3 или новее, monotonic будет псевдоним time.monotonic из стандартной библиотеки. time.monotonic Затем был также использован для threading -libary. Как сказано в документах:

Возвращает значение (в долях секунды) монотонных часов, т.е. часов, которые не могут двигаться назад. На часы не влияют обновления системных часов.

Итак, с Python >=3.3 threading.Timer будет независимым.


пытаясь решить вашу проблему: Windows

Я вижу возможность получить желаемое время работы системы, используя gettickcount от kernel32.dll с помощью ctypes:

import ctypes
kernel_32 = ctypes.cdll.LoadLibrary("Kernel32.dll")
kernel_32.GetTickCount()  # overflow after 49.7 days
kernel_32.GetTickCount64()

с помощью этого вы можете создать свой собственный таймер с чем-то несоответствующим:

def sleep(time):
    start = kernel_32.GetTickCount64()
    end = start + time
    while kernel_32.GetTickCount64() < end:
        pass
    print('done')

Я очень надеюсь, что этот подход поможет с вашей проблемой - удачи


РЕДАКТИРОВАТЬ: Linux

основанный на монотонности: вы можете попробовать это как альтернативу getTickCount 1:

try:
    clock_gettime = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True).clock_gettime
except Exception:
    clock_gettime = ctypes.CDLL(ctypes.util.find_library('rt'), use_errno=True).clock_gettime

class timespec(ctypes.Structure):
    """Time specification, as described in clock_gettime(3)."""
    _fields_ = (('tv_sec', ctypes.c_long), ('tv_nsec', ctypes.c_long))

if sys.platform.startswith('linux'):
    CLOCK_MONOTONIC = 1
elif sys.platform.startswith('freebsd'):
    CLOCK_MONOTONIC = 4
elif sys.platform.startswith('sunos5'):
    CLOCK_MONOTONIC = 4
elif 'bsd' in sys.platform:
    CLOCK_MONOTONIC = 3
elif sys.platform.startswith('aix'):
    CLOCK_MONOTONIC = ctypes.c_longlong(10)

def monotonic():
    """Monotonic clock, cannot go backward."""
    ts = timespec()
    if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(ts)):
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno))
    return ts.tv_sec + ts.tv_nsec / 1.0e9

1: Copyright 2014, 2015, 2016 Ori Livneh Лицензировано по лицензии Apache, версия 2.0 ("Лицензия"); Вы не можете использовать этот файл, кроме как в соответствии с Лицензией. Вы можете получить копию Лицензии по адресу http://www.apache.org/licenses/LICENSE-2.0 Если это не требуется действующим законодательством или не согласовано в письменной форме, программное обеспечение, распространяемое по Лицензии, распространяется на ОСНОВЕ "КАК ЕСТЬ", БЕЗ ГАРАНТИЙ ИЛИ УСЛОВИЙ ЛЮБОГО ВИДА, явных или подразумеваемых.

Другие вопросы по тегам