Asyncio Фатальная ошибка: сбой вызова protocol.data_received()

Когда клиент отправляет большое сообщение ~5300 байт, asyncio получает 2917-2923 байт, а затем выдает эту ошибку.Python 3.7. Неполученное сообщение заканчивается так

{"Деталь":4,"активный" ложь "пит":2,"turn_touch":0,"damaged_in_turn":0},{"деталь":7,"активный": FAL

И, конечно же, JSON не смог разобрать это сообщение.

Журнал ошибок

ERROR:asyncio:Fatal error: protocol.data_received() call failed.
protocol: <__main__.Server object at 0xb5a2e0ac>
transport: <_SelectorSocketTransport fd=9 read=polling write=<idle, bufsize=0>>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/selector_events.py", line 824, in _read_ready__data_received
    self._protocol.data_received(data)
  File "/home/den/piratebay/server.py", line 41, in data_received
    self.message_handle.check_message_id(self, data)
  File "/home/den/piratebay/message_handle.py", line 25, in check_message_id
    self.parsed_data = parse_data(data)
  File "/home/den/piratebay/action.py", line 50, in parse_data
    return json.loads(data)
  File "/usr/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 2918 (char 2917)

server.py

class Server(asyncio.Protocol):
    def connection_made(self, transport):
        logging.debug('connection_made')
        """ Called on instantiation, when new client connects """
        self.transport = transport
        self.addr = transport.get_extra_info('peername')
        s = transport.get_extra_info("socket")
        print('Connection from {}'.format(self.addr))


    def data_received(self, data):
        """ Handle data as it's received.  """
        logging.debug('received {} bytes'.format(len(data)))
        data = data.decode()
        print('received data->',data)
        self.message_handle.check_message_id(self, data)


    def connection_lost(self, ex):
        """ Called on client disconnect. Clean up client state """       
        self.transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Create server and initialize on the event loop
    coroutine = loop.create_server(Server,
                                   host=HOST,
                                   port=PORT)

Что является причиной этой ошибки? Спасибо заранее

1 ответ

Решение

Там нет никакой гарантии, что один data_received получит все сообщение уровня приложения. TCP - это потоковый протокол, который даже не предоставляет концепцию сообщения, а только поток байтов. Одна запись на стороне клиента может быть разделена на несколько пакетов и доставлена ​​на сервер несколькими операциями чтения. И наоборот, множественные записи могут быть объединены в один пакет и приняты как один блок.

Правильно написанная программа asyncio не может предполагать, что все данные будут доставлены в одном data_received вызов. Вместо, data_received необходимо собрать входящие данные, распознать, когда данные завершены, и только затем обрабатывать данные. То, как будет распознан конец сообщения, будет зависеть от используемого протокола - например, http предоставляет возможность объявить длину содержимого заранее. Если ожидается, что клиент отключится после отправки сообщения, конец сообщения будет распознан при возникновении условия конца файла.

В последнем случае, data_received будет собирать байты, которые будут обработаны eof_received:

import asyncio, io

class Server(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        addr = transport.get_extra_info('peername')
        print('Connection from {}'.format(addr))
        self._data = io.BytesIO()

    def data_received(self, data):
        print('received data->',data)
        self._data.write(data)

    def eof_received(self):
        print('received EOF')
        self._data.write(data)
        data = self._data.getvalue()
        data = data.decode()
        self.message_handle.check_message_id(self, data)

    def connection_lost(self, ex):
        self.transport.close()

Обратите внимание, что код, подобный приведенному выше, гораздо проще писать с использованием потокового API, который является рекомендуемым способом написания современного асинхронного кода:

import asyncio

async def handle_client(r, w):
    data = await r.read()  # read until EOF
    data = data.decode()
    message_handle.check_message_id(data)
    # ...

loop = asyncio.get_event_loop()
message_handle = ...
server = loop.run_until_complete(
    asyncio.start_server(
        lambda r, w: handle_client(r, w, message_handle),
        '127.0.0.1', 8888))
loop.run_forever()
Другие вопросы по тегам