Как очистить соединения после KeyboardInterrupt в python-trio

Мой класс, когда он подключен к серверу, должен немедленно отправить строку входа в систему, а после окончания сеанса он должен отправить строку выхода и очистить сокеты. Ниже мой код.

import trio

class test:

    _buffer = 8192
    _max_retry = 4

    def __init__(self, host='127.0.0.1', port=12345, usr='user', pwd='secret'):
        self.host = str(host)
        self.port = int(port)
        self.usr = str(usr)
        self.pwd = str(pwd)
        self._nl = b'\r\n'
        self._attempt = 0
        self._queue = trio.Queue(30)
        self._connected = trio.Event()
        self._end_session = trio.Event()

    @property
    def connected(self):
        return self._connected.is_set()

    async def _sender(self, client_stream, nursery):
        print('## sender: started!')
        q = self._queue
        while True:
            cmd = await q.get()
            print('## sending to the server:\n{!r}\n'.format(cmd))
            if self._end_session.is_set():
                nursery.cancel_scope.shield = True
                with trio.move_on_after(1):
                    await client_stream.send_all(cmd)
                nursery.cancel_scope.shield = False
            await client_stream.send_all(cmd)

    async def _receiver(self, client_stream, nursery):
        print('## receiver: started!')
        buff = self._buffer
        while True:
            data = await client_stream.receive_some(buff)
            if not data:
                print('## receiver: connection closed')
                self._end_session.set()
                break
            print('## got data from the server:\n{!r}'.format(data))

    async def _watchdog(self, nursery):
        await self._end_session.wait()
        await self._queue.put(self._logoff)
        self._connected.clear()
        nursery.cancel_scope.cancel()

    @property
    def _login(self, *a, **kw):
        nl = self._nl
        usr, pwd = self.usr, self.pwd
        return nl.join(x.encode() for x in ['Login', usr,pwd]) + 2*nl

    @property
    def _logoff(self, *a, **kw):
        nl = self._nl
        return nl.join(x.encode() for x in ['Logoff']) + 2*nl

    async def _connect(self):
        host, port = self.host, self.port
        print('## connecting to {}:{}'.format(host, port))
        try:
            client_stream = await trio.open_tcp_stream(host, port)
        except OSError as err:
            print('##', err)
        else:
            async with client_stream:
                self._end_session.clear()
                self._connected.set()
                self._attempt = 0
                # Sign in as soon as connected
                await self._queue.put(self._login)
                async with trio.open_nursery() as nursery:
                    print("## spawning watchdog...")
                    nursery.start_soon(self._watchdog, nursery)
                    print("## spawning sender...")
                    nursery.start_soon(self._sender, client_stream, nursery)
                    print("## spawning receiver...")
                    nursery.start_soon(self._receiver, client_stream, nursery)

    def connect(self):
        while self._attempt <= self._max_retry:
            try:
                trio.run(self._connect)
                trio.run(trio.sleep, 1)
                self._attempt += 1
            except KeyboardInterrupt:
                self._end_session.set()
                print('Bye bye...')
                break

tst = test()
tst.connect()

Моя логика не совсем работает. Ну, это работает, если я убью netcat слушатель, так что моя сессия выглядит следующим образом:

## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'

## receiver: connection closed
## sending to the server:
b'Logoff\r\n\r\n'

Обратите внимание, что Logoff Строка была отправлена, хотя здесь это не имеет смысла, так как к этому времени соединение уже разорвано.

Однако моя цель состоит в том, чтобы Logoff когда пользователь KeyboardInterrupt, В этом случае мой сеанс выглядит примерно так:

## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'

Bye bye...

Обратите внимание, что Logoff не был отправлен

Есть идеи?

1 ответ

Решение

Здесь ваше дерево вызовов выглядит примерно так:

connect
|
+- _connect*
   |
   +- _watchdog*
   |
   +- _sender*
   |
   +- _receiver*

*s обозначают 4 задания трио. _connect задание сидит в конце детского блока, ожидая, когда задание ребенка будет выполнено. _watchdog задача заблокирована в await self._end_session.wait(), _sender задача заблокирована в await q.get()и _receiver задача заблокирована в await client_stream.receive_some(...),

Когда вы нажимаете control-C, тогда стандартная семантика Python заключается в том, что независимо от того, какой бит кода Python выполняется, внезапно возникает KeyboardInterrupt, В этом случае у вас запущено 4 разные задачи, поэтому одна из этих заблокированных операций выбирается случайным образом [1] и вызывает KeyboardInterrupt, Это означает, что может произойти несколько разных вещей:

  • Если _watchdog"s wait колл поднимает KeyboardInterruptтогда _watchdog метод немедленно завершается, поэтому он даже не пытается отправить logout, Затем, как часть разматывания стека, трио отменяет все остальные задачи, и как только они вышли, KeyboardInterrupt продолжает распространяться, пока не достигнет вашего finally блокировать в connect, На этом этапе вы пытаетесь уведомить задачу сторожевого таймера, используя self._end_session.set(), но он больше не работает, поэтому не замечает.

  • Если _sender"s q.get() колл поднимает KeyboardInterruptтогда _sender метод сразу выходит, так что даже если _watchdog попросил его отправить сообщение о выходе из системы, его там не заметят. И в любом случае, трио затем все равно отменяет задачи сторожевого устройства и приемника, и все происходит, как описано выше.

  • Если _receiver"s receive_all колл поднимает KeyboardInterrupt... происходит то же самое.

  • Незначительные тонкости: _connect также может получить KeyboardInterrupt, что делает то же самое: отменяет всех детей, а затем ждет их остановки, прежде чем позволить KeyboardInterrupt продолжать размножаться.

Если вы хотите надежно поймать control-C, а затем что-то с ним сделать, то возникновение этого случая в некоторой случайной точке является довольно неприятным. Самый простой способ сделать это - использовать поддержку Trio для перехвата сигналов, чтобы пойматьsignal.SIGINTсигнал, который Python обычно преобразует вKeyboardInterrupt, ("INT" означает "прерывание".) Что-то вроде:

async def _control_c_watcher(self):
    # This API is currently a little cumbersome, sorry, see
    # https://github.com/python-trio/trio/issues/354
    with trio.catch_signals({signal.SIGINT}) as batched_signal_aiter:
        async for _ in batched_signal_aiter:
            self._end_session.set()
            # We exit the loop, restoring the normal behavior of
            # control-C. This way hitting control-C once will try to
            # do a polite shutdown, but if that gets stuck the user
            # can hit control-C again to raise KeyboardInterrupt and
            # force things to exit.
            break

и затем запустите это параллельно с другими вашими задачами.

У вас также есть проблема, которая в вашем _watchdog метод, он ставит logoff запрос в очередь - таким образом планируя сообщение, которое будет отправлено позже, _sender задача - и затем сразу отменяет все задачи, так что _sender Задание, вероятно, не получит шанс увидеть сообщение и отреагировать на него! В целом, мой код работает лучше, когда я использую задачи только тогда, когда это необходимо. Вместо того, чтобы иметь задачу отправителя и затем помещать сообщения в очередь, когда вы хотите отправить их, почему бы не иметь код, который хочет отправить сообщение, вызов stream.send_all напрямую? Единственное, на что вам следует обратить внимание: если у вас есть несколько задач, которые могут отправлять сообщения одновременно, вы можете использоватьtrio.Lock()чтобы убедиться, что они не сталкиваются друг с другом, позвонивsend_allв то же время:

async def send_all(self, data):
    async with self.send_lock:
        await self.send_stream.send_all(data)

async def do_logoff(self):
    # First send the message
    await self.send_all(b"Logoff\r\n\r\n")
    # And then, *after* the message has been sent, cancel the tasks
    self.nursery.cancel()

Если вы сделаете это таким образом, вы сможете избавиться от задачи сторожевого устройства и_end_sessionсобытие полностью.

Несколько других замечаний о вашем коде, пока я здесь:

  • призвание trio.runНесколько раз, как это необычно. Обычный стиль - вызывать его один раз в начале вашей программы и помещать в него весь свой реальный код. После выходаtrio.run, все состояние трио потеряно, вы определенно не выполняете никаких параллельных задач (так что нет ничего, что могло бы прослушивать и замечать ваш вызов _end_session.set()!). И вообще, почти все функции Trio предполагают, что вы уже внутри вызова trio.run, Оказывается, прямо сейчас вы можете позвонить trio.Queue() до начала трио без исключения, но это просто совпадение.

  • Использование экранирования внутри _sender выглядит странно для меня. Экранирование - это, как правило, расширенная функция, которую вы почти никогда не хотите использовать, и я не думаю, что это исключение.

Надеюсь, это поможет! И если вы хотите больше поговорить о таких проблемах стиля / дизайна, как это, но опасаетесь, что они могут быть слишком расплывчатыми для переполнения стека ("хорошо ли разработана эта программа?"), Тогда можете свободно заходить на канал чата трио.


[1] Ну, на самом деле трио, вероятно, выбирает основную задачу по разным причинам, но это не гарантировано и в любом случае это не имеет значения.

Другие вопросы по тегам