aiohttp - Webssocket - msg в буфере после ws.close()?
Я получаю обновления о состоянии некоторых финансовых рынков с сервера веб-сокетов. Однако из-за ограничений, налагаемых сервером, я должен периодически создавать новое соединение ws.
Первое сообщение из вновь установленного соединения - это начальное состояние, а последующие сообщения - это маленькие "дельта" сообщения, которые просто дают вам знать, как обновить состояние. Я не включил детали реализации для этого, но я повторно инициализирую свой внутренний объект состояния, когда я устанавливаю новое соединение.
У меня есть следующая сопрограмма для обработки входящих сообщений websocket, а также периодически закрывать и устанавливать новое соединение ws:
async def foo():
async with session.ws_connect('wss://example.com') as ws:
initialize_time = datetime.now()
async for msg in ws:
if datetime.now() - initialize_time > timedelta(seconds=3000):
await ws.close()
loop.create_task(foo())
return
else:
# process the msg
Возможно, это сомнительный дизайн. (Это?)
Мой опыт показывает, что я не получаю ошибок в течение времени (1 целый час), я не закрываю и повторно открываю соединение, но всегда получаю ошибку вскоре после повторного открытия в моей логике обновления состояния - например, она пытается обновить маркет с идентификатором не в текущем состоянии.
Поэтому я подозреваю, что сообщения в буфере первого ws-соединения, когда оно закрыто, затем впоследствии извлекаются циклом "async for msg in ws:" нового ws-соединения. Я мог бы лаять не на то дерево. Есть ли способ удалить сообщения в данный момент в буфере? Есть другие идеи?
1 ответ
Вы не можете полагаться на непрочитанные сообщения в буфере websocket. Доставка сообщений через веб-сокет не является атомарной операцией, есть возможность иметь сообщения в полете, которые были отправлены одноранговыми, но еще не доставлены на узел в момент закрытия веб-сокета.