Python 中的 ZeroMQ N 到 N 异步模式
ZeroMQ N to N async pattern in Python
N-代理-N 发布-订阅
与问题 N to N async pattern in ZeroMQ? 类似,但不幸的是,该问题从未收到有效代码的答案。
我正在尝试按照指南中的描述实施 Pub-Sub 网络:http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem(N-proxy-N 风格的小型消息代理)。遗憾的是,该指南未提供任何代码示例。
我已经尝试使用 PyZMQ 实现一个 Hello World 示例,我想我已经接近了,但是我遇到了一些我不知道如何处理的错误。很抱歉使用 asyncio(我对这个比线程更舒服)。
代码
"""Example using zmq to create a PubSub node_topic similar to a ROS topic"""
# Copyright (c) Stef van der Struijk <stefstruijk@protonmail.ch>.
# This example is in the public domain (CC-0)
# http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
import traceback
import logging
# N-proxy-M pattern: a subscriber which passes messages through a proxy through a publisher
class PubSubTopic:
def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
# 2 sockets, because we can only bind once to a socket (as opposed to connect)
self.url1 = "tcp://{}:{}".format(address, port1)
self.url2 = "tcp://{}:{}".format(address, port2)
# start proxy, pubs and subs async; demonstration purpose only, probably better in separate threads
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.xpub_xsub_proxy(), # commented out for different error
self.pub_hello_world(),
self.pub_hello_world(lang='jp'),
self.sub_hello_world(),
self.sub_hello_world(lang='jp'),
]))
# N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
async def xpub_xsub_proxy(self):
# no traceback with zmq.asyncio and no try statement
try:
print("Init proxy")
# Socket subscribing to publishers
frontend_pubs = self.context.socket(zmq.XSUB)
frontend_pubs.bind(self.url1)
# Socket publishing to subscribers
backend_subs = self.context.socket(zmq.XPUB)
backend_subs.bind(self.url2)
print("Try: Proxy... CONNECT!")
zmq.proxy(frontend_pubs, backend_subs)
print("CONNECT successful!")
except Exception as e:
print("Error with proxy :(")
# print(e)
logging.error(traceback.format_exc())
print()
# test case: 2 pubs to 1 topic
async def pub_hello_world(self, lang='en'):
# no traceback with zmq.asyncio and no try statement
try:
print("Init pub {}".format(lang))
# connect, because many publishers - 1 subscriber
pub = self.context.socket(zmq.PUB)
pub.connect(self.url1)
if lang == 'en':
message = "Hello World"
sleep = 1
else:
message = "Hello Sekai" # Japanese
sleep = 2
# wait proxy and subs to b ready
await asyncio.sleep(.5)
# keep publishing "Hello World" / "Hello Sekai" messages
print("Pub {}: Going to pub messages!".format(lang))
while True:
# publish message to topic 'world'
# multipart: topic, message; async always needs `send_multipart()`?
await pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
print("Pub {}: Have send msg".format(lang))
# slow down message publication
await asyncio.sleep(sleep)
except Exception as e:
print("Error with pub {}".format(lang))
# print(e)
logging.error(traceback.format_exc())
print()
# test case: 2 subs to 1 topic
async def sub_hello_world(self, lang='en'):
# no traceback with zmq.asyncio and no try statement
try:
print("Init sub {}".format(lang))
# connect, because many subscribers - 1 (proxy) pub
sub = self.context.socket(zmq.SUB)
sub.connect(self.url2)
# subscribe to topic 'en' or 'jp'
sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))
# wait proxy to be ready; necessary?
await asyncio.sleep(.2)
# keep listening to all published message, filtered on topic
print("Sub {}: Going to wait for messages!".format(lang))
while True:
msg_received = await sub.recv_multipart()
print("sub {}: {}".format(lang, msg_received))
except Exception as e:
print("Error with sub {}".format(lang))
# print(e)
logging.error(traceback.format_exc())
print()
if __name__ == '__main__':
PubSubTopic()
错误
代理错误
当我不注释掉代理函数时,得到如下回溯
python pub_sub_topic.py
Current libzmq version is 4.2.2
Current pyzmq version is 16.0.2
Init proxy
Try: Proxy... CONNECT!
^CTraceback (most recent call last):
File "pub_sub_topic.py", line 139, in <module>
PubSubTopic()
File "pub_sub_topic.py", line 43, in __init__
self.sub_hello_world(lang='jp'),
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1426, in _run_once
handle._run()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/events.py", line 127, in _run
self._callback(*self._args)
File "pub_sub_topic.py", line 62, in xpub_xsub_proxy
zmq.proxy(frontend_pubs, backend_subs)
File "zmq/backend/cython/_device.pyx", line 95, in zmq.backend.cython._device.proxy (zmq/backend/cython/_device.c:1824)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_device.c:1991)
KeyboardInterrupt
订阅者错误
如果我注释掉代理函数 (# self.xpub_xsub_proxy(),
),我会得到以下回溯
python pub_sub_topic.py
Current libzmq version is 4.2.2
Current pyzmq version is 16.0.2
Init sub en
Init sub jp
Init pub en
Init pub jp
Sub en: Going to wait for messages!
Error with sub en
ERROR:root:Traceback (most recent call last):
File "pub_sub_topic.py", line 128, in sub_hello_world
msg_received = await sub.recv_multipart()
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 170, in recv_multipart
dict(flags=flags, copy=copy, track=track)
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 321, in _add_recv_event
self._add_io_state(self._READ)
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 294, in _add_io_state
self.io_loop.add_reader(self, self._handle_recv)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 337, in add_reader
return self._add_reader(fd, callback, *args)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
key = self._selector.get_key(fd)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
return mapping[fileobj]
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
"{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Exception ignored in: <bound method Socket.__del__ of <zmq.asyncio.Socket object at 0x7fa90a4a7528>>
Traceback (most recent call last):
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 70, in __del__
self.close()
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 160, in close
self._clear_io_state()
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 316, in _clear_io_state
self._drop_io_state(self._state)
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 303, in _drop_io_state
self.io_loop.remove_reader(self)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 342, in remove_reader
return self._remove_reader(fd)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 279, in _remove_reader
key = self._selector.get_key(fd)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
return mapping[fileobj]
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
"{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Sub jp: Going to wait for messages!
*snip* Same error as 'Sub en' *snip*
Pub en: Going to pub messages!
Pub en: Have send msg
Pub jp: Going to pub messages!
Pub jp: Have send msg
Pub en: Have send msg
Pub jp: Have send msg
Pub en: Have send msg
^CTraceback (most recent call last):
File "pub_sub_topic.py", line 139, in <module>
PubSubTopic()
File "pub_sub_topic.py", line 43, in __init__
self.sub_hello_world(lang='jp'),
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1390, in _run_once
event_list = self._selector.select(timeout)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
系统信息
- Ubuntu16.04
- Python 3.6(通过 Anaconda)
- libzmq 版本 4.2.2
- pyzmq 版本 16.0.2
你绝对不应该评论代理功能。问题是因为 zmq.proxy 函数永远阻塞,而你 运行 它与 "run_until_complete" 事件循环。您应该将事件循环执行类型更改为 run_forever.
一如既往,答案很简单。通过将它分成 3 个脚本,我们不必使用线程和异步编程,因此这应该可以帮助更多人。
打开 6 个终端,并在每个终端中 运行 以下命令:
python proxy_topic.py
#代理/ROS主题
python proxy_pub.py
# 发布 "Hello World"
python proxy_pub.py jp
# 发布 "Hello Sekai"
python proxy_sub.py
# 接收所有消息
python proxy_sub.py en
# 仅接收 "Hello World";没必要
python proxy_sub.py jp
# 仅接收 "Hello Sekai";没必要
proxy_topic.py
import sys
import zmq
from zmq import Context
class ProxyPub:
def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
# 2 sockets, because we can only bind once to a socket (as opposed to connect)
self.url1 = "tcp://{}:{}".format(address, port1)
self.url2 = "tcp://{}:{}".format(address, port2)
self.xpub_xsub_proxy()
# N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
def xpub_xsub_proxy(self):
print("Init proxy")
# Socket subscribing to publishers
frontend_pubs = self.context.socket(zmq.XSUB)
frontend_pubs.bind(self.url1)
# Socket publishing to subscribers
backend_subs = self.context.socket(zmq.XPUB)
backend_subs.bind(self.url2)
print("Try: Proxy... CONNECT!")
zmq.proxy(frontend_pubs, backend_subs)
print("CONNECT successful!")
if __name__ == '__main__':
print("Arguments given: {}".format(sys.argv))
ProxyPub()
proxy_pub.py
import sys
import zmq
from zmq import Context
import time
class ProxyPub:
def __init__(self, lang='en', address='127.0.0.1', port='5566'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
self.url = "tcp://{}:{}".format(address, port)
self.pub_hello_world(lang)
def pub_hello_world(self, lang):
print("Init pub {}".format(lang))
# connect, because many publishers - 1 subscriber
pub = self.context.socket(zmq.PUB)
pub.connect(self.url)
if lang == 'en':
message = "Hello World"
sleep = 1
else:
message = "Hello Sekai" # Japanese
sleep = 2
# wait proxy and subs to b ready
time.sleep(.5)
# keep publishing "Hello World" / "Hello Sekai" messages
print("Pub {}: Going to pub messages!".format(lang))
while True:
# publish message to topic 'world'
# multipart: topic, message; async always needs `send_multipart()`?
pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
print("Pub {}: Have send msg".format(lang))
# slow down message publication
time.sleep(sleep)
if __name__ == '__main__':
print("Arguments given: {}".format(sys.argv))
if len(sys.argv) == 1:
ProxyPub()
elif len(sys.argv) == 2:
ProxyPub(lang=sys.argv[1])
else:
print("Too many arguments")
proxy_sub.py
import sys
import zmq
from zmq import Context
import time
class ProxyPub:
def __init__(self, lang='', address='127.0.0.1', port='5567'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
self.url = "tcp://{}:{}".format(address, port)
self.sub_hello_world(lang)
def sub_hello_world(self, lang):
print("Init sub {}".format(lang))
# connect, because many subscribers - 1 (proxy) pub
sub = self.context.socket(zmq.SUB)
sub.connect(self.url)
# subscribe to topic 'en' or 'jp'
sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))
# wait proxy to be ready; necessary?
time.sleep(.2)
# keep listening to all published message, filtered on topic
print("Sub {}: Going to wait for messages!".format(lang))
while True:
msg_received = sub.recv_multipart()
print("sub {}: {}".format(lang, msg_received))
if __name__ == '__main__':
print("Arguments given: {}".format(sys.argv))
if len(sys.argv) == 1:
ProxyPub()
elif len(sys.argv) == 2:
ProxyPub(lang=sys.argv[1])
else:
print("Too many arguments")
N-代理-N 发布-订阅
与问题 N to N async pattern in ZeroMQ? 类似,但不幸的是,该问题从未收到有效代码的答案。
我正在尝试按照指南中的描述实施 Pub-Sub 网络:http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem(N-proxy-N 风格的小型消息代理)。遗憾的是,该指南未提供任何代码示例。
我已经尝试使用 PyZMQ 实现一个 Hello World 示例,我想我已经接近了,但是我遇到了一些我不知道如何处理的错误。很抱歉使用 asyncio(我对这个比线程更舒服)。
代码
"""Example using zmq to create a PubSub node_topic similar to a ROS topic"""
# Copyright (c) Stef van der Struijk <stefstruijk@protonmail.ch>.
# This example is in the public domain (CC-0)
# http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
import traceback
import logging
# N-proxy-M pattern: a subscriber which passes messages through a proxy through a publisher
class PubSubTopic:
def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
# 2 sockets, because we can only bind once to a socket (as opposed to connect)
self.url1 = "tcp://{}:{}".format(address, port1)
self.url2 = "tcp://{}:{}".format(address, port2)
# start proxy, pubs and subs async; demonstration purpose only, probably better in separate threads
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.xpub_xsub_proxy(), # commented out for different error
self.pub_hello_world(),
self.pub_hello_world(lang='jp'),
self.sub_hello_world(),
self.sub_hello_world(lang='jp'),
]))
# N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
async def xpub_xsub_proxy(self):
# no traceback with zmq.asyncio and no try statement
try:
print("Init proxy")
# Socket subscribing to publishers
frontend_pubs = self.context.socket(zmq.XSUB)
frontend_pubs.bind(self.url1)
# Socket publishing to subscribers
backend_subs = self.context.socket(zmq.XPUB)
backend_subs.bind(self.url2)
print("Try: Proxy... CONNECT!")
zmq.proxy(frontend_pubs, backend_subs)
print("CONNECT successful!")
except Exception as e:
print("Error with proxy :(")
# print(e)
logging.error(traceback.format_exc())
print()
# test case: 2 pubs to 1 topic
async def pub_hello_world(self, lang='en'):
# no traceback with zmq.asyncio and no try statement
try:
print("Init pub {}".format(lang))
# connect, because many publishers - 1 subscriber
pub = self.context.socket(zmq.PUB)
pub.connect(self.url1)
if lang == 'en':
message = "Hello World"
sleep = 1
else:
message = "Hello Sekai" # Japanese
sleep = 2
# wait proxy and subs to b ready
await asyncio.sleep(.5)
# keep publishing "Hello World" / "Hello Sekai" messages
print("Pub {}: Going to pub messages!".format(lang))
while True:
# publish message to topic 'world'
# multipart: topic, message; async always needs `send_multipart()`?
await pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
print("Pub {}: Have send msg".format(lang))
# slow down message publication
await asyncio.sleep(sleep)
except Exception as e:
print("Error with pub {}".format(lang))
# print(e)
logging.error(traceback.format_exc())
print()
# test case: 2 subs to 1 topic
async def sub_hello_world(self, lang='en'):
# no traceback with zmq.asyncio and no try statement
try:
print("Init sub {}".format(lang))
# connect, because many subscribers - 1 (proxy) pub
sub = self.context.socket(zmq.SUB)
sub.connect(self.url2)
# subscribe to topic 'en' or 'jp'
sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))
# wait proxy to be ready; necessary?
await asyncio.sleep(.2)
# keep listening to all published message, filtered on topic
print("Sub {}: Going to wait for messages!".format(lang))
while True:
msg_received = await sub.recv_multipart()
print("sub {}: {}".format(lang, msg_received))
except Exception as e:
print("Error with sub {}".format(lang))
# print(e)
logging.error(traceback.format_exc())
print()
if __name__ == '__main__':
PubSubTopic()
错误
代理错误
当我不注释掉代理函数时,得到如下回溯
python pub_sub_topic.py
Current libzmq version is 4.2.2
Current pyzmq version is 16.0.2
Init proxy
Try: Proxy... CONNECT!
^CTraceback (most recent call last):
File "pub_sub_topic.py", line 139, in <module>
PubSubTopic()
File "pub_sub_topic.py", line 43, in __init__
self.sub_hello_world(lang='jp'),
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1426, in _run_once
handle._run()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/events.py", line 127, in _run
self._callback(*self._args)
File "pub_sub_topic.py", line 62, in xpub_xsub_proxy
zmq.proxy(frontend_pubs, backend_subs)
File "zmq/backend/cython/_device.pyx", line 95, in zmq.backend.cython._device.proxy (zmq/backend/cython/_device.c:1824)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_device.c:1991)
KeyboardInterrupt
订阅者错误
如果我注释掉代理函数 (# self.xpub_xsub_proxy(),
),我会得到以下回溯
python pub_sub_topic.py
Current libzmq version is 4.2.2
Current pyzmq version is 16.0.2
Init sub en
Init sub jp
Init pub en
Init pub jp
Sub en: Going to wait for messages!
Error with sub en
ERROR:root:Traceback (most recent call last):
File "pub_sub_topic.py", line 128, in sub_hello_world
msg_received = await sub.recv_multipart()
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 170, in recv_multipart
dict(flags=flags, copy=copy, track=track)
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 321, in _add_recv_event
self._add_io_state(self._READ)
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 294, in _add_io_state
self.io_loop.add_reader(self, self._handle_recv)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 337, in add_reader
return self._add_reader(fd, callback, *args)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
key = self._selector.get_key(fd)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
return mapping[fileobj]
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
"{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Exception ignored in: <bound method Socket.__del__ of <zmq.asyncio.Socket object at 0x7fa90a4a7528>>
Traceback (most recent call last):
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 70, in __del__
self.close()
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 160, in close
self._clear_io_state()
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 316, in _clear_io_state
self._drop_io_state(self._state)
File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 303, in _drop_io_state
self.io_loop.remove_reader(self)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 342, in remove_reader
return self._remove_reader(fd)
File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 279, in _remove_reader
key = self._selector.get_key(fd)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
return mapping[fileobj]
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
"{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Sub jp: Going to wait for messages!
*snip* Same error as 'Sub en' *snip*
Pub en: Going to pub messages!
Pub en: Have send msg
Pub jp: Going to pub messages!
Pub jp: Have send msg
Pub en: Have send msg
Pub jp: Have send msg
Pub en: Have send msg
^CTraceback (most recent call last):
File "pub_sub_topic.py", line 139, in <module>
PubSubTopic()
File "pub_sub_topic.py", line 43, in __init__
self.sub_hello_world(lang='jp'),
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1390, in _run_once
event_list = self._selector.select(timeout)
File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
系统信息
- Ubuntu16.04
- Python 3.6(通过 Anaconda)
- libzmq 版本 4.2.2
- pyzmq 版本 16.0.2
你绝对不应该评论代理功能。问题是因为 zmq.proxy 函数永远阻塞,而你 运行 它与 "run_until_complete" 事件循环。您应该将事件循环执行类型更改为 run_forever.
一如既往,答案很简单。通过将它分成 3 个脚本,我们不必使用线程和异步编程,因此这应该可以帮助更多人。
打开 6 个终端,并在每个终端中 运行 以下命令:
python proxy_topic.py
#代理/ROS主题python proxy_pub.py
# 发布 "Hello World"python proxy_pub.py jp
# 发布 "Hello Sekai"python proxy_sub.py
# 接收所有消息python proxy_sub.py en
# 仅接收 "Hello World";没必要python proxy_sub.py jp
# 仅接收 "Hello Sekai";没必要
proxy_topic.py
import sys
import zmq
from zmq import Context
class ProxyPub:
def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
# 2 sockets, because we can only bind once to a socket (as opposed to connect)
self.url1 = "tcp://{}:{}".format(address, port1)
self.url2 = "tcp://{}:{}".format(address, port2)
self.xpub_xsub_proxy()
# N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
def xpub_xsub_proxy(self):
print("Init proxy")
# Socket subscribing to publishers
frontend_pubs = self.context.socket(zmq.XSUB)
frontend_pubs.bind(self.url1)
# Socket publishing to subscribers
backend_subs = self.context.socket(zmq.XPUB)
backend_subs.bind(self.url2)
print("Try: Proxy... CONNECT!")
zmq.proxy(frontend_pubs, backend_subs)
print("CONNECT successful!")
if __name__ == '__main__':
print("Arguments given: {}".format(sys.argv))
ProxyPub()
proxy_pub.py
import sys
import zmq
from zmq import Context
import time
class ProxyPub:
def __init__(self, lang='en', address='127.0.0.1', port='5566'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
self.url = "tcp://{}:{}".format(address, port)
self.pub_hello_world(lang)
def pub_hello_world(self, lang):
print("Init pub {}".format(lang))
# connect, because many publishers - 1 subscriber
pub = self.context.socket(zmq.PUB)
pub.connect(self.url)
if lang == 'en':
message = "Hello World"
sleep = 1
else:
message = "Hello Sekai" # Japanese
sleep = 2
# wait proxy and subs to b ready
time.sleep(.5)
# keep publishing "Hello World" / "Hello Sekai" messages
print("Pub {}: Going to pub messages!".format(lang))
while True:
# publish message to topic 'world'
# multipart: topic, message; async always needs `send_multipart()`?
pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
print("Pub {}: Have send msg".format(lang))
# slow down message publication
time.sleep(sleep)
if __name__ == '__main__':
print("Arguments given: {}".format(sys.argv))
if len(sys.argv) == 1:
ProxyPub()
elif len(sys.argv) == 2:
ProxyPub(lang=sys.argv[1])
else:
print("Too many arguments")
proxy_sub.py
import sys
import zmq
from zmq import Context
import time
class ProxyPub:
def __init__(self, lang='', address='127.0.0.1', port='5567'):
# get ZeroMQ version
print("Current libzmq version is %s" % zmq.zmq_version())
print("Current pyzmq version is %s" % zmq.pyzmq_version())
self.context = Context.instance()
self.url = "tcp://{}:{}".format(address, port)
self.sub_hello_world(lang)
def sub_hello_world(self, lang):
print("Init sub {}".format(lang))
# connect, because many subscribers - 1 (proxy) pub
sub = self.context.socket(zmq.SUB)
sub.connect(self.url)
# subscribe to topic 'en' or 'jp'
sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))
# wait proxy to be ready; necessary?
time.sleep(.2)
# keep listening to all published message, filtered on topic
print("Sub {}: Going to wait for messages!".format(lang))
while True:
msg_received = sub.recv_multipart()
print("sub {}: {}".format(lang, msg_received))
if __name__ == '__main__':
print("Arguments given: {}".format(sys.argv))
if len(sys.argv) == 1:
ProxyPub()
elif len(sys.argv) == 2:
ProxyPub(lang=sys.argv[1])
else:
print("Too many arguments")