aiohttp - Webssocket - msg в буфере после ws.close()?

Я получаю обновления о состоянии некоторых финансовых рынков с сервера веб-сокетов. Однако из-за ограничений, налагаемых сервером, я должен периодически создавать новое соединение ws.

Первое сообщение из вновь установленного соединения - это начальное состояние, а последующие сообщения - это маленькие "дельта" сообщения, которые просто дают вам знать, как обновить состояние. Я не включил детали реализации для этого, но я повторно инициализирую свой внутренний объект состояния, когда я устанавливаю новое соединение.

У меня есть следующая сопрограмма для обработки входящих сообщений websocket, а также периодически закрывать и устанавливать новое соединение ws:

async def foo():
    async with session.ws_connect('wss://example.com') as ws:
        initialize_time = datetime.now()
        async for msg in ws:
            if datetime.now() - initialize_time > timedelta(seconds=3000):
                await ws.close()
                loop.create_task(foo())
                return
            else:
                # process the msg

Возможно, это сомнительный дизайн. (Это?)

Мой опыт показывает, что я не получаю ошибок в течение времени (1 целый час), я не закрываю и повторно открываю соединение, но всегда получаю ошибку вскоре после повторного открытия в моей логике обновления состояния - например, она пытается обновить маркет с идентификатором не в текущем состоянии.

Поэтому я подозреваю, что сообщения в буфере первого ws-соединения, когда оно закрыто, затем впоследствии извлекаются циклом "async for msg in ws:" нового ws-соединения. Я мог бы лаять не на то дерево. Есть ли способ удалить сообщения в данный момент в буфере? Есть другие идеи?

1 ответ

Вы не можете полагаться на непрочитанные сообщения в буфере websocket. Доставка сообщений через веб-сокет не является атомарной операцией, есть возможность иметь сообщения в полете, которые были отправлены одноранговыми, но еще не доставлены на узел в момент закрытия веб-сокета.

Другие вопросы по тегам