Конечная точка многоклиентного потокового Websocket (Python)

Недавно я попал в "криптоманию" и начал писать свои собственные обертки вокруг API на некоторых биржах.

Binance, в частности, имеет конечную точку потокового веб-сокета.

где вы можете передавать данные, но через конечную точку websocket. Я думал, что попробую это самостоятельно, используя sanic.

вот мой маршрут веб-сокета

@ws_routes.websocket("/hello")
async def hello(request, ws):
    while True:
        await ws.send("hello")

теперь у меня есть 2 клиента на 2 разных машинах, подключающихся к нему

async def main():
    async with aiohttp.ClientSession() as session:

        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

однако только один из клиентов сможет подключиться и получить отправленные данные с сервера. Я предполагаю, что из-за while зациклить его блокирование и предотвращение соединения другого соединения, потому что это не делает yield?

как сделать так, чтобы он передавался нескольким клиентам, не блокируя другие соединения?

Я смотрел на добавление большего количества рабочих, и это, кажется, помогает, но я не понимаю, что это не очень масштабируемое решение. потому что каждый клиент будет его собственным работником, и если у вас есть тысячи или даже 10 клиентов, это будет 10 работников 1 на клиента.

Так как же Binance осуществляет потоковую передачу через веб-сокет? или, черт возьми, как работает конечная точка потока Twitter?

Как он может обслуживать бесконечный поток для нескольких одновременных клиентов? потому что в конечном итоге это то, что я пытаюсь сделать

2 ответа

Решение

Способ решить это будет что-то вроде этого.

Я использую sanic фреймворк

class Stream:
    def __init__(self):
        self._connected_clients = set()

    async def __call__(self, *args, **kwargs):
        await self.stream(*args, **kwargs)

    async def stream(self, request, ws):
        self._connected_clients.add(ws)

        while True:
            disconnected_clients = []
            for client in self._connected_clients:  # check for disconnected clients
                if client.state == 3:  # append to a list because error will be raised if removed from set while iterating over it 
                    disconnected_clients.append(client)
            for client in disconnected_clients:  # remove disconnected clients
                self._connected_clients.remove(client)

            await asyncio.wait([client.send("Hello") for client in self._connected_clients]))


ws_routes.add_websocket_route(Stream(), "/stream")
  1. следить за каждым websocket сессия
  2. добавить к list или же set
  3. проверить на недействительность websocket сеансы и удалить из вашего websocket контейнер сессий
  4. сделать await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]]) который в основном трансляция.

5.profit!

это в основном шаблон дизайна pubsub

Может быть как то так?

import aiohttp
import asyncio
loop = asyncio.get_event_loop()
async def main():
    async with aiohttp.ClientSession() as session:
        ws  = await session.ws_connect("ws://192.168.86.31:8000/hello")
        while True:
            data = await ws.receive()
            print(data)

multiple_coroutines = [main() for _ in range(10)]
loop.run_until_complete(asyncio.gather(*multiple_coroutines))
Другие вопросы по тегам