pyzmq recv_json 无法解码 send_json 发送的消息
pyzmq recv_json can't decode message sent by send_json
这是我的代码,去掉了多余的东西:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
worker.py
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(
(os.getpid(), True)
)
当我 运行 时会发生什么:
process_id, val = socket.recv_json()
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
return jsonapi.loads(msg)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
return jsonmod.loads(s, **kwargs)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
return _default_decoder.decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
obj, end = self.raw_decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)
如果我使用 ipdb:
> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
379 msg = self.recv(flags)
--> 380 return jsonapi.loads(msg)
381
ipdb> p msg
'\x00\x9f\xd9\x06\xa2'
嗯,这看起来不像 JSON...这是 pyzmq 中的错误吗?我用错了吗?
嗯,好的,找到答案了。
ØMQ 接口中存在令人讨厌的不对称性,因此您必须了解所使用的套接字类型。
在这种情况下,我使用 ROUTER/DEALER 架构意味着从 DEALER 套接字发送的 JSON 消息,当我这样做时 send_json
,被包裹在 multipart 邮件信封。第一部分是客户端 ID(我猜这是我上面得到的 '\x00\x9f\xd9\x06\xa2'
),第二部分是我们感兴趣的 JSON 字符串。
所以在我的 coordinator.py 的最后一行中,我需要改为这样做:
id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)
恕我直言,这是 ØMQ/pyzmq 的糟糕设计,库应该将其抽象化,并且只有 send
和 recv
方法,这才有效。
我从这个问题中得到了线索 How can I use send_json with pyzmq PUB SUB 所以看起来 PUB/SUB 架构也有同样的问题,毫无疑问其他人也有。
这在文档中有描述,但不是很清楚
http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern
更新
事实上,我发现在我的案例中,我可以通过直接使用消息信封的 'client id' 部分来进一步简化代码。所以工人只是做:
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid()) # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(True)
同样值得注意的是,当你想从 ROUTER 向另一个方向发送消息时,你必须将它作为多部分发送,指定它要发送给哪个客户端,例如:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
pids = set()
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
pids.add(process_id)
# need some code in here to decide when to stop listening
# and break the loop
for pid in pids:
socket.send_multipart([pid, 'a string message'])
# ^ do your own json encoding if required
我想可能有一些 ØMQ 方法来执行广播消息,而不是像我上面那样循环发送到每个客户端。我希望文档清楚地描述了每种可用的套接字类型以及如何使用它们。
这是我的代码,去掉了多余的东西:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
worker.py
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(
(os.getpid(), True)
)
当我 运行 时会发生什么:
process_id, val = socket.recv_json()
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
return jsonapi.loads(msg)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
return jsonmod.loads(s, **kwargs)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
return _default_decoder.decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
obj, end = self.raw_decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)
如果我使用 ipdb:
> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
379 msg = self.recv(flags)
--> 380 return jsonapi.loads(msg)
381
ipdb> p msg
'\x00\x9f\xd9\x06\xa2'
嗯,这看起来不像 JSON...这是 pyzmq 中的错误吗?我用错了吗?
嗯,好的,找到答案了。
ØMQ 接口中存在令人讨厌的不对称性,因此您必须了解所使用的套接字类型。
在这种情况下,我使用 ROUTER/DEALER 架构意味着从 DEALER 套接字发送的 JSON 消息,当我这样做时 send_json
,被包裹在 multipart 邮件信封。第一部分是客户端 ID(我猜这是我上面得到的 '\x00\x9f\xd9\x06\xa2'
),第二部分是我们感兴趣的 JSON 字符串。
所以在我的 coordinator.py 的最后一行中,我需要改为这样做:
id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)
恕我直言,这是 ØMQ/pyzmq 的糟糕设计,库应该将其抽象化,并且只有 send
和 recv
方法,这才有效。
我从这个问题中得到了线索 How can I use send_json with pyzmq PUB SUB 所以看起来 PUB/SUB 架构也有同样的问题,毫无疑问其他人也有。
这在文档中有描述,但不是很清楚
http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern
更新
事实上,我发现在我的案例中,我可以通过直接使用消息信封的 'client id' 部分来进一步简化代码。所以工人只是做:
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid()) # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(True)
同样值得注意的是,当你想从 ROUTER 向另一个方向发送消息时,你必须将它作为多部分发送,指定它要发送给哪个客户端,例如:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
pids = set()
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
pids.add(process_id)
# need some code in here to decide when to stop listening
# and break the loop
for pid in pids:
socket.send_multipart([pid, 'a string message'])
# ^ do your own json encoding if required
我想可能有一些 ØMQ 方法来执行广播消息,而不是像我上面那样循环发送到每个客户端。我希望文档清楚地描述了每种可用的套接字类型以及如何使用它们。