运行 几个 ApplicationSession 非阻塞地使用 autbahn.asyncio.wamp
Running several ApplicationSessions non-blockingly using autbahn.asyncio.wamp
我正尝试在 python 中同时 运行 两个 autobahn.asyncio.wamp.ApplicationSession
。以前,我按照 this post's answer 中的建议使用高速公路库的修改来完成此操作。我现在
需要更专业的解决方案。
在谷歌搜索了一段时间后,this post appeared quite promising,但使用了 twisted
库,而不是 asyncio
。我无法为 autobahn
库的 asyncio
分支找到类似的解决方案,因为它似乎没有使用 Reactors
.
我遇到的主要问题是 ApplicationRunner.run()
正在阻塞(这就是我之前将其外包给线程的原因),所以我不能 运行 一秒钟 ApplicationRunner
在它之后。
我确实需要同时访问 2 个 websocket 通道,我似乎不能用一个 ApplicationSession
。
到目前为止我的代码:
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time
channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'
class LTCComponent(ApplicationSession):
def onConnect(self):
self.join(self.config.realm)
@coroutine
def onJoin(self, details):
def onTicker(*args, **kwargs):
print('LTCComponent', args, kwargs)
try:
yield from self.subscribe(onTicker, channel1)
except Exception as e:
print("Could not subscribe to topic:", e)
class XMRComponent(ApplicationSession):
def onConnect(self):
self.join(self.config.realm)
@coroutine
def onJoin(self, details):
def onTicker(*args, **kwargs):
print('XMRComponent', args, kwargs)
try:
yield from self.subscribe(onTicker, channel2)
except Exception as e:
print("Could not subscribe to topic:", e)
def main():
runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
runner.run(LTCComponent)
runner.run(XMRComponent) # <- is not being called
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
quit()
except Exception as e:
print(time.time(), e)
我对 autobahn
库的了解有限,恐怕文档对我的情况没有多大改善。我在这里忽略了什么吗?一个函数,一个参数,这将使我能够复合我的组件或 运行 它们同时?
可能是与 provided here 类似的解决方案,它实现了替代方案 ApplicationRunner
?
相关主题
Running two ApplicationSessions in twisted
Running Autobahn ApplicationRunner in Thread
Autobahn.wamp.ApplicationSession Source
Autobahn.wamp.Applicationrunner Source
根据要求,使用 multithreading
代码从 @stovfl 的回答中追溯:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/nils/anaconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
self.appRunner.run(self.__ApplicationSession)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143, in run
transport_factory = WampWebSocketClientFactory(create, url=self.url, serializers=self.serializers)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line 319, in __init__
WebSocketClientFactory.__init__(self, *args, **kwargs)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line 268, in __init__
self.loop = loop or asyncio.get_event_loop()
File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py", line 626, in get_event_loop
return get_event_loop_policy().get_event_loop()
File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py", line 572, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.
正如我在 traceback
中看到的那样,我们只进行了第 2 步,共 4 步
From the asyncio docs:
This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources
所以我放弃了使用 multithreading
的第一个提案。
我可以想象以下三个选项:
- 使用
multiprocessing
而不是 multithreading
- 在
asyncio loop
里面用coroutine
做
- 在
def onJoin(self, details)
中的 channels
之间切换
第二个提案,第一个选项使用 multiprocessing
。
我可以启动两个 asyncio loops
,所以 appRunner.run(...)
应该可以。
如果只有 channel
不同,您可以使用 one class ApplicationSession
。
如果你需要传递不同的 class ApplicationSession
将它添加到 args=
class __ApplicationSession(ApplicationSession):
# ...
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
# ...
import multiprocessing as mp
import time
def ApplicationRunner_process(realm, channel):
appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
appRunner.run(__ApplicationSession)
if __name__ == "__main__":
AppRun = [{'process':None, 'channel':'BTC_LTC'},
{'process': None, 'channel': 'BTC_XMR'}]
for app in AppRun:
app['process'] = mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
app['process'].start()
time.sleep(0.1)
AppRun[0]['process'].join()
AppRun[1]['process'].join()
按照方法 you linked for twisted 我设法通过 asyncio 设置获得相同的行为 start_loop=False
import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)
runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)
asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()
class MyApplicationSession(ApplicationSession):
def __init__(self, cfg):
super().__init__(cfg)
self.cli_id = cfg.extra['cli_id']
def onJoin(self, details):
print("session attached", self.cli_id)
我正尝试在 python 中同时 运行 两个 autobahn.asyncio.wamp.ApplicationSession
。以前,我按照 this post's answer 中的建议使用高速公路库的修改来完成此操作。我现在
需要更专业的解决方案。
在谷歌搜索了一段时间后,this post appeared quite promising,但使用了 twisted
库,而不是 asyncio
。我无法为 autobahn
库的 asyncio
分支找到类似的解决方案,因为它似乎没有使用 Reactors
.
我遇到的主要问题是 ApplicationRunner.run()
正在阻塞(这就是我之前将其外包给线程的原因),所以我不能 运行 一秒钟 ApplicationRunner
在它之后。
我确实需要同时访问 2 个 websocket 通道,我似乎不能用一个 ApplicationSession
。
到目前为止我的代码:
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time
channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'
class LTCComponent(ApplicationSession):
def onConnect(self):
self.join(self.config.realm)
@coroutine
def onJoin(self, details):
def onTicker(*args, **kwargs):
print('LTCComponent', args, kwargs)
try:
yield from self.subscribe(onTicker, channel1)
except Exception as e:
print("Could not subscribe to topic:", e)
class XMRComponent(ApplicationSession):
def onConnect(self):
self.join(self.config.realm)
@coroutine
def onJoin(self, details):
def onTicker(*args, **kwargs):
print('XMRComponent', args, kwargs)
try:
yield from self.subscribe(onTicker, channel2)
except Exception as e:
print("Could not subscribe to topic:", e)
def main():
runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
runner.run(LTCComponent)
runner.run(XMRComponent) # <- is not being called
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
quit()
except Exception as e:
print(time.time(), e)
我对 autobahn
库的了解有限,恐怕文档对我的情况没有多大改善。我在这里忽略了什么吗?一个函数,一个参数,这将使我能够复合我的组件或 运行 它们同时?
可能是与 provided here 类似的解决方案,它实现了替代方案 ApplicationRunner
?
相关主题
Running two ApplicationSessions in twisted
Running Autobahn ApplicationRunner in Thread
Autobahn.wamp.ApplicationSession Source
Autobahn.wamp.Applicationrunner Source
根据要求,使用 multithreading
代码从 @stovfl 的回答中追溯:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/nils/anaconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
self.appRunner.run(self.__ApplicationSession)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143, in run
transport_factory = WampWebSocketClientFactory(create, url=self.url, serializers=self.serializers)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line 319, in __init__
WebSocketClientFactory.__init__(self, *args, **kwargs)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line 268, in __init__
self.loop = loop or asyncio.get_event_loop()
File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py", line 626, in get_event_loop
return get_event_loop_policy().get_event_loop()
File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py", line 572, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.
正如我在 traceback
中看到的那样,我们只进行了第 2 步,共 4 步
From the asyncio docs:
This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources
所以我放弃了使用 multithreading
的第一个提案。
我可以想象以下三个选项:
- 使用
multiprocessing
而不是multithreading
- 在
asyncio loop
里面用coroutine
做 - 在
def onJoin(self, details)
中的
channels
之间切换
第二个提案,第一个选项使用 multiprocessing
。
我可以启动两个 asyncio loops
,所以 appRunner.run(...)
应该可以。
如果只有 channel
不同,您可以使用 one class ApplicationSession
。
如果你需要传递不同的 class ApplicationSession
将它添加到 args=
class __ApplicationSession(ApplicationSession):
# ...
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
# ...
import multiprocessing as mp
import time
def ApplicationRunner_process(realm, channel):
appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
appRunner.run(__ApplicationSession)
if __name__ == "__main__":
AppRun = [{'process':None, 'channel':'BTC_LTC'},
{'process': None, 'channel': 'BTC_XMR'}]
for app in AppRun:
app['process'] = mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
app['process'].start()
time.sleep(0.1)
AppRun[0]['process'].join()
AppRun[1]['process'].join()
按照方法 you linked for twisted 我设法通过 asyncio 设置获得相同的行为 start_loop=False
import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)
runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)
asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()
class MyApplicationSession(ApplicationSession):
def __init__(self, cfg):
super().__init__(cfg)
self.cli_id = cfg.extra['cli_id']
def onJoin(self, details):
print("session attached", self.cli_id)