Обработка вызова блокирующей функции в Python

Я работаю со структурой Gnuradio. Я обрабатываю потоковые диаграммы, которые я генерирую для отправки / получения сигналов. Эти потоковые диаграммы инициализируются и запускаются, но они не возвращают поток управления моему приложению:

Я импортировал time

while time.time() < endtime:
        # invoke GRC flowgraph for 1st sequence
        if not seq1_sent:
            tb = send_seq_2.top_block()
            tb.Run(True)
            seq1_sent = True
            if time.time() < endtime:
                break

        # invoke GRC flowgraph for 2nd sequence
        if not seq2_sent:
            tb = send_seq_2.top_block()
            tb.Run(True)
            seq2_sent = True
            if time.time() < endtime:
                break

Проблема в том, что только первый оператор if вызывает потоковый граф (который взаимодействует с оборудованием). Я застрял в этом. Я мог бы использовать поток, но я неопытен, как тайм-аут потоков в Python. Я сомневаюсь, что это возможно, потому что кажется, что уничтожение потоков не в API. Этот скрипт должен работать только на Linux...

Как вы правильно обрабатываете блокирующие функции с Python - не убивая всю программу. Еще один более конкретный пример этой проблемы:

import signal, os

def handler(signum, frame):
        # print 'Signal handler called with signal', signum
        #raise IOError("Couldn't open device!")
        import time
        print "wait"
        time.sleep(3)


def foo():
    # Set the signal handler and a 5-second alarm
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(3)

    # This open() may hang indefinitely
    fd = os.open('/dev/ttys0', os.O_RDWR)
    signal.alarm(0)          # Disable the alarm


foo()
print "hallo"

Как я все еще получаю print "hallo",;)

Спасибо Мариус

8 ответов

Решение

Прежде всего - следует избегать использования сигналов любой ценой:

1) Это может привести к тупику. SIGALRM может дойти до процесса до системного вызова блокировки (представьте сверхвысокую нагрузку в системе!), И системный вызов не будет прерван. Тупик.

2) Игра с сигналами может иметь неприятные нелокальные последствия. Например, системные вызовы в других потоках могут прерываться, что обычно не то, что вам нужно. Обычно системные вызовы перезапускаются при получении (не смертельного) сигнала. Когда вы устанавливаете обработчик сигнала, он автоматически отключает это поведение для всего процесса или, так сказать, группы потоков. Проверьте 'man siginterrupt' на это.

Поверьте мне, я встречал две проблемы раньше, и они совсем не веселые.

В некоторых случаях блокировку можно избежать явно - я настоятельно рекомендую использовать select() и friends (проверьте модуль select в Python) для обработки блокирующих операций записи и чтения. Это не решит проблему блокировки вызова open().

Для этого я протестировал это решение, и оно хорошо работает для именованных каналов. Он открывается неблокирующим способом, затем выключает его и использует вызов select(), чтобы в конечном итоге остановить время, если ничего не доступно.

import sys, os, select, fcntl

f = os.open(sys.argv[1], os.O_RDONLY | os.O_NONBLOCK)

flags = fcntl.fcntl(f, fcntl.F_GETFL, 0)
fcntl.fcntl(f, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)

r, w, e = select.select([f], [], [], 2.0)

if r == [f]:
    print 'ready'
    print os.read(f, 100)
else:
    print 'unready'

os.close(f)

Проверьте это с помощью:

mkfifo /tmp/fifo
python <code_above.py> /tmp/fifo (1st terminal)
echo abcd > /tmp/fifo (2nd terminal)

С некоторыми дополнительными усилиями вызов select() можно использовать в качестве основного цикла всей программы, объединяющего все события - вы можете использовать libev или libevent или несколько оболочек Python вокруг них.

Если вы не можете явно принудительно вызвать неблокирующее поведение, скажем, вы просто используете внешнюю библиотеку, тогда это будет намного сложнее. Потоки могут делать, но, очевидно, это не современное решение, как правило, просто неправильно.

Боюсь, что в целом вы не можете решить эту проблему надежным способом - это действительно зависит от того, ЧТО вы блокируете.

IIUC, у каждого top_block есть метод stop. Таким образом, вы можете запустить top_block в потоке и выполнить остановку, если истекло время ожидания. Было бы лучше, если бы у top_block wait() также был тайм-аут, но, увы, это не так.

В главном потоке вам нужно подождать два случая: a) завершение top_block и b) истечение времени ожидания. Ожидания заняты - это зло:-), поэтому вы должны использовать время ожидания соединения для ожидания потока. Если поток еще жив после объединения, вам нужно остановить top_run.

Вы можете установить сигнал будильника, который прервет ваш звонок с тайм-аутом:

http://docs.python.org/library/signal.html

signal.alarm(1) # 1 second

my_blocking_call()
signal.alarm(0)

Вы также можете установить обработчик сигнала, если хотите убедиться, что он не разрушит ваше приложение:

def my_handler(signum, frame):
    pass

signal.signal(signal.SIGALRM, my_handler)

РЕДАКТИРОВАТЬ: Что не так с этим куском кода? Это не должно прерывать ваше приложение:

import signal, time

def handler(signum, frame):
    print "Timed-out"

def foo():
    # Set the signal handler and a 5-second alarm
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(3)

    # This open() may hang indefinitely
    time.sleep(5)
    signal.alarm(0)          # Disable the alarm


foo()
print "hallo"

Дело в том:

  1. Обработчик по умолчанию для SIGALRM - это прерывание приложения. Если вы установите свой обработчик, он больше не должен останавливать приложение.

  2. Получение сигнала обычно прерывает системные вызовы (затем разблокирует ваше приложение)

Легкая часть вашего вопроса связана с обработкой сигналов. С точки зрения среды выполнения Python, сигнал, полученный во время выполнения системным вызовом интерпретатором, представляется вашему коду Python как исключение OSError с errno приписывается соответствующий errno.EINTR

Так что это, вероятно, работает примерно так, как вы и предполагали:

    #!/usr/bin/env python
    import signal, os, errno, time

    def handler(signum, frame):
            # print 'Signal handler called with signal', signum
            #raise IOError("Couldn't open device!")
            print "timed out"
            time.sleep(3)


    def foo():
        # Set the signal handler and a 5-second alarm
        signal.signal(signal.SIGALRM, handler)

        try:
            signal.alarm(3)
            # This open() may hang indefinitely
            fd = os.open('/dev/ttys0', os.O_RDWR)
        except OSError, e:
            if e.errno != errno.EINTR:
                raise e
        signal.alarm(0)          # Disable the alarm

    foo()
    print "hallo"

Обратите внимание, что я перенес импорт time из определения функции, поскольку кажется, что это плохая форма, чтобы скрыть импорт таким образом. Мне совсем не понятно, почему вы спите в обработчике сигналов, и на самом деле это кажется довольно плохой идеей.

Ключевой момент, который я пытаюсь сделать, заключается в том, что любой (не игнорируемый) сигнал прервет вашу основную строку выполнения кода Python. Ваш обработчик будет вызываться с аргументами, указывающими, какой номер сигнала вызвал выполнение (позволяя использовать одну функцию Python для обработки множества различных сигналов) и объект фрейма (который можно использовать для отладки или инструментария некоторого вида).

Поскольку основной поток через код прерывается, вам необходимо обернуть этот код в некоторую обработку исключений, чтобы восстановить контроль после таких событий. (Между прочим, если вы пишете код на C, у вас возникнет та же проблема; вы должны быть готовы к любым из ваших библиотечных функций с базовыми системными вызовами, чтобы возвращать ошибки и обрабатывать -EINTR в системе с ошибками, возвращаясь к повторным попыткам или переход к некоторой альтернативе в вашей основной строке (например, переход к другому файлу или без какого-либо файла / ввода и т. д.).

Как указали другие в своих ответах на ваш вопрос, основание вашего подхода на SIGALARM, вероятно, будет чревато проблемами переносимости и надежности. Хуже того, некоторые из этих проблем могут быть состояниями гонки, с которыми вы никогда не столкнетесь в своей среде тестирования, и могут возникать только в условиях, которые чрезвычайно трудно воспроизвести. Уродливые детали имеют место в случаях повторного входа --- что произойдет, если сигналы отправляются во время выполнения вашего обработчика сигнала?

Я использовал SIGALARM в некоторых сценариях, и для меня это не было проблемой под Linux. Код, над которым я работал, подходил для этой задачи. Это может быть достаточно для ваших нужд.

На ваш основной вопрос сложно ответить, не зная больше о том, как ведет себя этот код Gnuradio, какие объекты вы создаете из него, и какие объекты они возвращают.

Взглянув на документы, на которые вы ссылались, я вижу, что они, похоже, не предлагают какого-либо аргумента или параметра "тайм-аута", которые можно было бы использовать для непосредственного ограничения блокирующего поведения. В таблице в разделе "Управление потоковыми графиками" я вижу, что они специально говорят, что .run() может выполняться бесконечно или до получения SIGINT. Я также отмечаю, что .start() может запускать потоки в вашем приложении и, похоже, возвращает управление вашей строке кода Python во время их работы. (Кажется, это зависит от характера ваших потоковых графиков, которые я не понимаю достаточно).

Похоже, вы могли бы создать свои потоковые графики, .start() их, а затем (после некоторого времени обработки или спящего в вашей основной строке кода Python) вызвать .lock() метод на вашем контролирующем объекте (tb?). Я предполагаю, что это переводит представление состояния Python... объект Python... в режим покоя, чтобы вы могли запрашивать состояние или, как говорится, перенастраивать свой потоковый граф. Если вы позвоните .run() это позвонит .wait() после звонка .start(); а также .wait() очевидно будет работать, пока все блоки не "покажут, что они сделаны" или пока вы не вызовете объект .stop() метод.

Похоже, вы хотите использовать .start() и ни .run() ни .wait(); затем позвоните .stop() после выполнения любой другой обработки (включая time.sleep()).

Возможно, что-то так просто, как:

    tb = send_seq_2.top_block()
    tb.start()
    time.sleep(endtime - time.time())
    tb.stop()
    seq1_sent = True
    tb = send_seq_2.top_block()
    tb.start()
    seq2_sent = True

.. хотя я с подозрением отношусь к моей time.sleep() там. Возможно, вы хотите сделать что-то еще, где вы запрашиваете tb состояние объекта (возможно, в результате сна в течение меньших интервалов, вызывая его .lock() метод, и доступ к атрибутам, о которых я ничего не знаю, а затем вызывая его .unlock() прежде чем снова спать.

if not seq1_sent:
        tb = send_seq_2.top_block()
        tb.Run(True)
        seq1_sent = True
        if time.time() < endtime:
            break

Если 'if time.time() endtime' в этом тесте?

Вы можете попробовать использовать отложенное выполнение... Twisted Framework использует их много

http://www6.uniovi.es/python/pycon/papers/deferex/

Вы упоминаете уничтожение потоков в Python - это частично возможно, хотя вы можете убить / прервать другой поток только при запуске кода Python, а не в коде C, так что это может вам не помочь, как вы хотите.

см. ответ на другой вопрос: python: как отправлять пакеты в многопоточном режиме, а затем поток уничтожает себя

или поищите в гугле нити python для получения более подробной информации, например: http://code.activestate.com/recipes/496960-thread2-killable-threads/

Если вы хотите установить тайм-аут для функции блокировки, используйте threading.Thread как метод join(timeout), который блокирует до времени ожидания.

В принципе, что-то подобное должно делать то, что вы хотите:

import threading
my_thread = threading.Thread(target=send_seq_2.top_block)
my_thread.start()
my_thread.join(TIMEOUT)
Другие вопросы по тегам