Конечная точка многоклиентного потокового Websocket (Python)
Недавно я попал в "криптоманию" и начал писать свои собственные обертки вокруг API на некоторых биржах.
Binance, в частности, имеет конечную точку потокового веб-сокета.
где вы можете передавать данные, но через конечную точку websocket. Я думал, что попробую это самостоятельно, используя sanic.
вот мой маршрут веб-сокета
@ws_routes.websocket("/hello")
async def hello(request, ws):
while True:
await ws.send("hello")
теперь у меня есть 2 клиента на 2 разных машинах, подключающихся к нему
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
однако только один из клиентов сможет подключиться и получить отправленные данные с сервера. Я предполагаю, что из-за while
зациклить его блокирование и предотвращение соединения другого соединения, потому что это не делает yield
?
как сделать так, чтобы он передавался нескольким клиентам, не блокируя другие соединения?
Я смотрел на добавление большего количества рабочих, и это, кажется, помогает, но я не понимаю, что это не очень масштабируемое решение. потому что каждый клиент будет его собственным работником, и если у вас есть тысячи или даже 10 клиентов, это будет 10 работников 1 на клиента.
Так как же Binance осуществляет потоковую передачу через веб-сокет? или, черт возьми, как работает конечная точка потока Twitter?
Как он может обслуживать бесконечный поток для нескольких одновременных клиентов? потому что в конечном итоге это то, что я пытаюсь сделать
2 ответа
Способ решить это будет что-то вроде этого.
Я использую sanic
фреймворк
class Stream:
def __init__(self):
self._connected_clients = set()
async def __call__(self, *args, **kwargs):
await self.stream(*args, **kwargs)
async def stream(self, request, ws):
self._connected_clients.add(ws)
while True:
disconnected_clients = []
for client in self._connected_clients: # check for disconnected clients
if client.state == 3: # append to a list because error will be raised if removed from set while iterating over it
disconnected_clients.append(client)
for client in disconnected_clients: # remove disconnected clients
self._connected_clients.remove(client)
await asyncio.wait([client.send("Hello") for client in self._connected_clients]))
ws_routes.add_websocket_route(Stream(), "/stream")
- следить за каждым
websocket
сессия - добавить к
list
или жеset
- проверить на недействительность
websocket
сеансы и удалить из вашегоwebsocket
контейнер сессий - сделать
await asyncio.wait([ws_session.send() for ws_session [list of valid sessions]])
который в основном трансляция.
5.profit!
это в основном шаблон дизайна pubsub
Может быть как то так?
import aiohttp
import asyncio
loop = asyncio.get_event_loop()
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
multiple_coroutines = [main() for _ in range(10)]
loop.run_until_complete(asyncio.gather(*multiple_coroutines))