Как закрыть python asyncio транспорт?

Я работаю над проектом, который использует Python Asyncio сокет-сервер. Проблема заключается в том, что реализация сервера не вызывает.close() для транспорта, когда сервер останавливается. Это, кажется, оставляет клиентов подключенными и вызывает сбои в других частях кода.

Документы Python говорят, что транспорты должны быть явно закрыты, но в этом проекте я не знаю, где их можно закрыть, потому что нет ссылок на транспорты, созданные для каждого клиента. https://docs.python.org/3/library/asyncio-dev.html

Вот код:

"""
Socket server forwarding request to internal server
"""
import logging
try:
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
    import asyncio
except ImportError:
    import trollius as asyncio


from opcua import ua
from opcua.server.uaprocessor import UaProcessor

logger = logging.getLogger(__name__)


class BinaryServer(object):

    def __init__(self, internal_server, hostname, port):
        self.logger = logging.getLogger(__name__)
        self.hostname = hostname
        self.port = port
        self.iserver = internal_server
        self.loop = internal_server.loop
        self._server = None
        self._policies = []

    def set_policies(self, policies):
        self._policies = policies

    def start(self):

        class OPCUAProtocol(asyncio.Protocol):

            """
            instanciated for every connection
            defined as internal class since it needs access
            to the internal server object
            FIXME: find another solution
            """

            iserver = self.iserver
            loop = self.loop
            logger = self.logger
            policies = self._policies

            def connection_made(self, transport):
                self.peername = transport.get_extra_info('peername')
                self.logger.info('New connection from %s', self.peername)
                self.transport = transport
                self.processor = UaProcessor(self.iserver, self.transport)
                self.processor.set_policies(self.policies)
                self.data = b""

            def connection_lost(self, ex):
                self.logger.info('Lost connection from %s, %s', self.peername, ex)
                self.transport.close()
                self.processor.close()

            def data_received(self, data):
                logger.debug("received %s bytes from socket", len(data))
                if self.data:
                    data = self.data + data
                    self.data = b""
                self._process_data(data)

            def _process_data(self, data):
                buf = ua.utils.Buffer(data)
                while True:
                    try:
                        backup_buf = buf.copy()
                        try:
                            hdr = ua.Header.from_string(buf)
                        except ua.utils.NotEnoughData:
                            logger.info("We did not receive enough data from client, waiting for more")
                            self.data = backup_buf.read(len(backup_buf))
                            return
                        if len(buf) < hdr.body_size:
                            logger.info("We did not receive enough data from client, waiting for more")
                            self.data = backup_buf.read(len(backup_buf))
                            return
                        ret = self.processor.process(hdr, buf)
                        if not ret:
                            logger.info("processor returned False, we close connection from %s", self.peername)
                            self.transport.close()
                            return
                        if len(buf) == 0:
                            return
                    except Exception:
                        logger.exception("Exception raised while parsing message from client, closing")
                        self.transport.close()
                        break

        coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
        self._server = self.loop.run_coro_and_wait(coro)
        print('Listening on {}:{}'.format(self.hostname, self.port))

    def stop(self):
        self.logger.info("Closing asyncio socket server")
        self.loop.call_soon(self._server.close)
        self.loop.run_coro_and_wait(self._server.wait_closed())

Как вы можете видеть, когда мы вызываем stop() для этого класса сервера, сервер asyncio вызывает его метод close. Однако если клиенты подключены, созданные транспорты никогда не закрываются.

Репозиторий проекта находится здесь https://github.com/FreeOpcUa/python-opcua/, вы можете взглянуть на выпуск 137.

Как правильно закрыть транспортный объект?

1 ответ

Я решаю это, применяя такой подход:

#self.OPCUAServer - это мой сервер opcua

узлы = []

nodes.append (self.OPCUAServer.get_node("ns=0; s=Measurements")) # Добавление двух корневых узлов nodes.append (self.OPCUAServer.get_node("ns=1; s=Calibrations")) # в listself.OPCUAServer.delete_nodes(nodes, True) # Рекурсивно вызывать delete_nodes с этим списком self.OPCUAServer.stop()

Другие вопросы по тегам