Как закрыть python asyncio транспорт?
Я работаю над проектом, который использует Python Asyncio сокет-сервер. Проблема заключается в том, что реализация сервера не вызывает.close() для транспорта, когда сервер останавливается. Это, кажется, оставляет клиентов подключенными и вызывает сбои в других частях кода.
Документы Python говорят, что транспорты должны быть явно закрыты, но в этом проекте я не знаю, где их можно закрыть, потому что нет ссылок на транспорты, созданные для каждого клиента. https://docs.python.org/3/library/asyncio-dev.html
Вот код:
"""
Socket server forwarding request to internal server
"""
import logging
try:
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
except ImportError:
import trollius as asyncio
from opcua import ua
from opcua.server.uaprocessor import UaProcessor
logger = logging.getLogger(__name__)
class BinaryServer(object):
def __init__(self, internal_server, hostname, port):
self.logger = logging.getLogger(__name__)
self.hostname = hostname
self.port = port
self.iserver = internal_server
self.loop = internal_server.loop
self._server = None
self._policies = []
def set_policies(self, policies):
self._policies = policies
def start(self):
class OPCUAProtocol(asyncio.Protocol):
"""
instanciated for every connection
defined as internal class since it needs access
to the internal server object
FIXME: find another solution
"""
iserver = self.iserver
loop = self.loop
logger = self.logger
policies = self._policies
def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
self.logger.info('New connection from %s', self.peername)
self.transport = transport
self.processor = UaProcessor(self.iserver, self.transport)
self.processor.set_policies(self.policies)
self.data = b""
def connection_lost(self, ex):
self.logger.info('Lost connection from %s, %s', self.peername, ex)
self.transport.close()
self.processor.close()
def data_received(self, data):
logger.debug("received %s bytes from socket", len(data))
if self.data:
data = self.data + data
self.data = b""
self._process_data(data)
def _process_data(self, data):
buf = ua.utils.Buffer(data)
while True:
try:
backup_buf = buf.copy()
try:
hdr = ua.Header.from_string(buf)
except ua.utils.NotEnoughData:
logger.info("We did not receive enough data from client, waiting for more")
self.data = backup_buf.read(len(backup_buf))
return
if len(buf) < hdr.body_size:
logger.info("We did not receive enough data from client, waiting for more")
self.data = backup_buf.read(len(backup_buf))
return
ret = self.processor.process(hdr, buf)
if not ret:
logger.info("processor returned False, we close connection from %s", self.peername)
self.transport.close()
return
if len(buf) == 0:
return
except Exception:
logger.exception("Exception raised while parsing message from client, closing")
self.transport.close()
break
coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
self._server = self.loop.run_coro_and_wait(coro)
print('Listening on {}:{}'.format(self.hostname, self.port))
def stop(self):
self.logger.info("Closing asyncio socket server")
self.loop.call_soon(self._server.close)
self.loop.run_coro_and_wait(self._server.wait_closed())
Как вы можете видеть, когда мы вызываем stop() для этого класса сервера, сервер asyncio вызывает его метод close. Однако если клиенты подключены, созданные транспорты никогда не закрываются.
Репозиторий проекта находится здесь https://github.com/FreeOpcUa/python-opcua/, вы можете взглянуть на выпуск 137.
Как правильно закрыть транспортный объект?
1 ответ
Я решаю это, применяя такой подход:
#self.OPCUAServer - это мой сервер opcua
узлы = []
nodes.append (self.OPCUAServer.get_node("ns=0; s=Measurements")) # Добавление двух корневых узлов nodes.append (self.OPCUAServer.get_node("ns=1; s=Calibrations")) # в listself.OPCUAServer.delete_nodes(nodes, True) # Рекурсивно вызывать delete_nodes с этим списком self.OPCUAServer.stop()