Запуск нескольких ApplicationSessions без блокировки с использованием autbahn.asyncio.wamp

Я пытаюсь запустить два autobahn.asyncio.wamp.ApplicationSession в Python одновременно. Ранее я делал это, используя модификацию библиотеки autobahn, как предложено в ответе на этот пост. Теперь мне требуется более профессиональное решение.

После некоторого поиска в Google этот пост оказался довольно многообещающим, но использует twisted библиотека, а не asyncio, Я не смог найти аналогичное решение для asyncio филиал autobahn библиотека, так как она не использует Reactors,

Основная проблема у меня заключается в том, что ApplicationRunner.run() блокирует (вот почему я ранее передал его в поток), поэтому я не могу просто запустить секунду ApplicationRunner после этого.

Мне нужно получить доступ к двум каналам веб-сокета одновременно, что я не могу сделать с одним ApplicationSession,

Мой код до сих пор:

from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time


channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'

class LTCComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('LTCComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel1)
        except Exception as e:
            print("Could not subscribe to topic:", e)

class XMRComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('XMRComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel2)
        except Exception as e:
            print("Could not subscribe to topic:", e)

def main():
    runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
    runner.run(LTCComponent)
    runner.run(XMRComponent) # <- is not being called


if __name__ == "__main__":

    try:
        main()
    except KeyboardInterrupt:
        quit()

    except Exception as e:
        print(time.time(), e)

Мои знания о autobahn библиотека ограничена, и, боюсь, документация не сильно улучшит мою ситуацию. Я что-то пропускаю здесь? Функция, параметр, который позволил бы мне либо объединить мои компоненты, либо запустить их оба одновременно?

Возможно, аналогичное решение, как представлено здесь, который реализует альтернативу ApplicationRunner?


Похожие темы

Запуск двух ApplicationSessions в витой

Запуск Autobahn ApplicationRunner в теме

Autobahn.wamp.ApplicationSession Source

Источник Autobahn.wamp.Applicationrunner


По требованию, ответ от @stovfl с использованием multithreading код:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/nils/anaconda3/lib/python3.5/threading.py", line     914, in _bootstrap_inner
    self.run()
  File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
    self.appRunner.run(self.__ApplicationSession)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143,     in run
    transport_factory = WampWebSocketClientFactory(create,         url=self.url,                 serializers=self.serializers)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     319, in __init__
    WebSocketClientFactory.__init__(self, *args, **kwargs)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     268, in __init__
    self.loop = loop or asyncio.get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 626, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 572, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.

1 ответ

Решение

Как я вижу из tracebackмы достигли только шага 2 из 4

Из документов Asyncio:
Этот модуль предоставляет инфраструктуру для написания однопоточного параллельного кода с использованием сопрограмм, мультиплексирования доступа ввода-вывода через сокеты и другие ресурсы.

Поэтому я оставляю свое первое предложение, используя multithreading,
Я мог представить следующие три варианта:

  1. Сделай это с multiprocessing вместо multithreading
  2. Сделай это с coroutine внутри asyncio loop
  3. Переключаться между channels в def onJoin(self, details)

Второе предложение, первый вариант с использованием multiprocessing,
Я могу начать два asyncio loops, так appRunner.run(...) должно сработать.

Вы можете использовать один class ApplicationSession если channel единственные разные. Если вам нужно пройти разные class ApplicationSession добавить его в args=

class __ApplicationSession(ApplicationSession):
        # ...
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])
        except Exception as e:
            # ...

import multiprocessing as mp
import time

def ApplicationRunner_process(realm, channel):
        appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
        appRunner.run(__ApplicationSession)

if __name__ == "__main__":
    AppRun = [{'process':None, 'channel':'BTC_LTC'},
              {'process': None, 'channel': 'BTC_XMR'}]

    for app in AppRun:
        app['process'] =  mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
        app['process'].start()
        time.sleep(0.1)

    AppRun[0]['process'].join()
    AppRun[1]['process'].join()

Следуя подходу, который вы связали для витого, мне удалось получить то же поведение с установкой asyncio start_loop=False

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)

runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)

asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()

class MyApplicationSession(ApplicationSession):

    def __init__(self, cfg):
        super().__init__(cfg)
        self.cli_id = cfg.extra['cli_id']

   def onJoin(self, details):
        print("session attached", self.cli_id)
Другие вопросы по тегам