运行 几个 ApplicationSession 非阻塞地使用 autbahn.asyncio.wamp

Running several ApplicationSessions non-blockingly using autbahn.asyncio.wamp

我正尝试在 python 中同时 运行 两个 autobahn.asyncio.wamp.ApplicationSession。以前,我按照 this post's answer 中的建议使用高速公路库的修改来完成此操作。我现在 需要更专业的解决方案。

在谷歌搜索了一段时间后,this post appeared quite promising,但使用了 twisted 库,而不是 asyncio。我无法为 autobahn 库的 asyncio 分支找到类似的解决方案,因为它似乎没有使用 Reactors.

我遇到的主要问题是 ApplicationRunner.run() 正在阻塞(这就是我之前将其外包给线程的原因),所以我不能 运行 一秒钟 ApplicationRunner 在它之后。

我确实需要同时访问 2 个 websocket 通道,我似乎不能用一个 ApplicationSession

到目前为止我的代码:

from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time


channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'

class LTCComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('LTCComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel1)
        except Exception as e:
            print("Could not subscribe to topic:", e)

class XMRComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('XMRComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel2)
        except Exception as e:
            print("Could not subscribe to topic:", e)

def main():
    runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
    runner.run(LTCComponent)
    runner.run(XMRComponent) # <- is not being called


if __name__ == "__main__":

    try:
        main()
    except KeyboardInterrupt:
        quit()

    except Exception as e:
        print(time.time(), e)

我对 autobahn 库的了解有限,恐怕文档对我的情况没有多大改善。我在这里忽略了什么吗?一个函数,一个参数,这将使我能够复合我的组件或 运行 它们同时?

可能是与 provided here 类似的解决方案,它实现了替代方案 ApplicationRunner ?


相关主题

Running two ApplicationSessions in twisted

Running Autobahn ApplicationRunner in Thread

Autobahn.wamp.ApplicationSession Source

Autobahn.wamp.Applicationrunner Source


根据要求,使用 multithreading 代码从 @stovfl 的回答中追溯:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/nils/anaconda3/lib/python3.5/threading.py", line     914, in _bootstrap_inner
    self.run()
  File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
    self.appRunner.run(self.__ApplicationSession)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143,     in run
    transport_factory = WampWebSocketClientFactory(create,         url=self.url,                 serializers=self.serializers)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     319, in __init__
    WebSocketClientFactory.__init__(self, *args, **kwargs)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     268, in __init__
    self.loop = loop or asyncio.get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 626, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 572, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.

正如我在 traceback 中看到的那样,我们只进行了第 2 步,共 4 步

From the asyncio docs:
This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources

所以我放弃了使用 multithreading 的第一个提案。
我可以想象以下三个选项:

  1. 使用 multiprocessing 而不是 multithreading
  2. asyncio loop里面用coroutine
  3. def onJoin(self, details)
  4. 中的 channels 之间切换

第二个提案,第一个选项使用 multiprocessing
我可以启动两个 asyncio loops,所以 appRunner.run(...) 应该可以。

如果只有 channel 不同,您可以使用 one class ApplicationSession。 如果你需要传递不同的 class ApplicationSession 将它添加到 args=

class __ApplicationSession(ApplicationSession):
        # ...
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])
        except Exception as e:
            # ...

import multiprocessing as mp
import time

def ApplicationRunner_process(realm, channel):
        appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
        appRunner.run(__ApplicationSession)

if __name__ == "__main__":
    AppRun = [{'process':None, 'channel':'BTC_LTC'},
              {'process': None, 'channel': 'BTC_XMR'}]

    for app in AppRun:
        app['process'] =  mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
        app['process'].start()
        time.sleep(0.1)

    AppRun[0]['process'].join()
    AppRun[1]['process'].join()

按照方法 you linked for twisted 我设法通过 asyncio 设置获得相同的行为 start_loop=False

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)

runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)

asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()

class MyApplicationSession(ApplicationSession):

    def __init__(self, cfg):
        super().__init__(cfg)
        self.cli_id = cfg.extra['cli_id']

   def onJoin(self, details):
        print("session attached", self.cli_id)