Как я могу реализовать интерактивный клиент веб-сокета с автобаном asyncio?
Я пытаюсь реализовать клиент websocket/wamp, используя autobahn | python и asyncio
и пока он работает, есть части, которые ускользнули от меня.
То, что я действительно пытаюсь сделать, это реализовать WAMP в qt5/QML, но на данный момент это кажется более легким путем.
Этот упрощенный клиент, в основном скопированный из Интернета, действительно работает. Он читает службу времени, когда onJoin
происходит.
Что я хотел бы сделать, это запустить это чтение из внешнего источника.
Запутанный подход, который я выбрал - это запустить asyncio
Цикл событий в потоке, а затем отправить команду через сокет для запуска чтения. Я до сих пор не могу понять, куда поместить подпрограмму / сопрограмму, чтобы ее можно было найти из подпрограммы читателя.
Я подозреваю, что есть более простой способ сделать это, но я еще не нашел его. Предложения приветствуются.
#!/usr/bin/python3
try:
import asyncio
except ImportError:
## Trollius >= 0.3 was renamed
import trollius as asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
from socket import socketpair
rsock, wsock = socketpair()
def reader() :
data = rsock.recv(100)
print("Received:", data.decode())
class MyFrontendComponent(wamp.ApplicationSession):
def onConnect(self):
self.join(u"realm1")
@asyncio.coroutine
def onJoin(self, details):
print('joined')
## call a remote procedure
##
try:
now = yield from self.call(u'com.timeservice.now')
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
def onLeave(self, details):
self.disconnect()
def onDisconnect(self):
asyncio.get_event_loop().stop()
def start_aloop() :
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
transport_factory = websocket.WampWebSocketClientFactory(session_factory,
debug = False,
debug_wamp = False)
coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
loop.add_reader(rsock,reader)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
if __name__ == '__main__':
session_factory = wamp.ApplicationSessionFactory()
session_factory.session = MyFrontendComponent
## 4) now enter the asyncio event loop
print('starting thread')
thread = threading.Thread(target=start_aloop)
thread.start()
time.sleep(5)
print("IN MAIN")
# emulate an outside call
wsock.send(b'a byte string')
2 ответа
Вы можете прослушивать сокет асинхронно внутри цикла обработки событий, используя loop.sock_accept
, Вы можете просто вызвать сопрограмму для настройки сокета внутри onConnect
или же onJoin
:
try:
import asyncio
except ImportError:
## Trollius >= 0.3 was renamed
import trollius as asyncio
from autobahn.asyncio import wamp, websocket
import socket
class MyFrontendComponent(wamp.ApplicationSession):
def onConnect(self):
self.join(u"realm1")
@asyncio.coroutine
def setup_socket(self):
# Create a non-blocking socket
self.sock = socket.socket()
self.sock.setblocking(0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(('localhost', 8889))
self.sock.listen(5)
loop = asyncio.get_event_loop()
# Wait for connections to come in. When one arrives,
# call the time service and disconnect immediately.
while True:
conn, address = yield from loop.sock_accept(self.sock)
yield from self.call_timeservice()
conn.close()
@asyncio.coroutine
def onJoin(self, details):
print('joined')
# Setup our socket server
asyncio.async(self.setup_socket())
## call a remote procedure
##
yield from self.call_timeservice()
@asyncio.coroutine
def call_timeservice(self):
try:
now = yield from self.call(u'com.timeservice.now')
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
... # The rest is the same
Спасибо за ответ Дано. Не совсем то решение, которое мне было нужно, но оно указало мне правильное направление. Да, я хочу, чтобы клиент выполнял удаленные вызовы RPC из внешнего триггера.
Я придумал следующее, которое позволяет мне передать строку для конкретного вызова (хотя сейчас реализована только одна)
Вот то, что я придумал, хотя я не уверен, насколько это элегантно.
import asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
import socket
rsock, wsock = socket.socketpair()
class MyFrontendComponent(wamp.ApplicationSession):
def onConnect(self):
self.join(u"realm1")
@asyncio.coroutine
def setup_socket(self):
# Create a non-blocking socket
self.sock = rsock
self.sock.setblocking(0)
loop = asyncio.get_event_loop()
# Wait for connections to come in. When one arrives,
# call the time service and disconnect immediately.
while True:
rcmd = yield from loop.sock_recv(rsock,80)
yield from self.call_service(rcmd.decode())
@asyncio.coroutine
def onJoin(self, details):
# Setup our socket server
asyncio.async(self.setup_socket())
@asyncio.coroutine
def call_service(self,rcmd):
print(rcmd)
try:
now = yield from self.call(rcmd)
except Exception as e:
print("Error: {}".format(e))
else:
print("Current time from time service: {}".format(now))
def onLeave(self, details):
self.disconnect()
def onDisconnect(self):
asyncio.get_event_loop().stop()
def start_aloop() :
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
transport_factory = websocket.WampWebSocketClientFactory(session_factory,
debug = False,
debug_wamp = False)
coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
if __name__ == '__main__':
session_factory = wamp.ApplicationSessionFactory()
session_factory.session = MyFrontendComponent
## 4) now enter the asyncio event loop
print('starting thread')
thread = threading.Thread(target=start_aloop)
thread.start()
time.sleep(5)
wsock.send(b'com.timeservice.now')
time.sleep(5)
wsock.send(b'com.timeservice.now')
time.sleep(5)
wsock.send(b'com.timeservice.now')