我如何在非 zmq 套接字和 pyzmq 之间进行调整?
How can I adapt between non zmq socket and pyzmq?
我想在非 ZMQ 套接字和 ZMQ 套接字之间编写一个桥接器。
客户代码:
import socket
if __name__ == '__main__':
HOST = "localhost"
PORT = 8888
BUFFER = 4096
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print sock
ret = sock.connect((HOST, PORT))
print ret
ret = sock.send('hello, tcpServer!')
print ret
recv = sock.recv(BUFFER)
print ('[tcpServer siad]: %s' % recv)
sock.close()
except e:
print e
代理代码,使用此代理向ZMQ_REP服务器发送请求。
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:8888")
socket_req = context.socket(zmq.REQ)
socket_req.connect("tcp://localhost:5556")
while True:
clientid, message = socket.recv_multipart();
print("id: %r" % clientid)
print("request:",message.decode('utf8'))
socket_req.send(clientid, flags=zmq.SNDMORE, copy=False)
socket_req.send("Hi", copy=False)
clientid, message = socket_req.recv_multipart()
print("id: %r" % clientid)
print("request:",message.decode('utf8'))
ZMQ_REP 服务器代码:
import zmq
import time
import sys
if __name__ == '__main__':
port = '5556'
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)
while True:
message = socket.recv()
print "Received request: ", message
time.sleep(1)
socket.send("world from %s" % port)
REQ 获取错误:
Received request: k
Traceback (most recent call last):
File "req_server.py", line 21, in <module>
socket.send("world from %s" % port)
File "zmq/backend/cython/socket.pyx", line 574, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5434)
File "zmq/backend/cython/socket.pyx", line 621, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5196)
File "zmq/backend/cython/socket.pyx", line 181, in zmq.backend.cython.socket._send_copy (zmq/backend/cython/socket.c:2035)
File "zmq/backend/cython/checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6248)
zmq.error.ZMQError: Operation cannot be accomplished in current state
socket_req.send(clientid, flags=zmq.SNDMORE, copy=False)
socket_req.send("Hi", copy=False)
最好的猜测是它没有正确注册 SNDMORE 标志并尝试发送一个全新的请求而不是附加到第一个请求(因此打破了 REQ 套接字的严格 SEND/RECEIVE 顺序)...因此套接字的 "current state" 不允许它发送消息的第二部分。尝试使用 send_multipart()
,或验证您的参数是否正确传递。
第一点:一般不建议在zmq中使用REQ/REP。使用更通用的 DEALER/ROUTER 组合。唯一的区别:
- 当 ROUTER 收到消息时,路由 ID 是消息的第一部分。这用于将回复路由回发件人。
- 锁步 req/rep/req/rep 序列未强制执行(这是您看到的错误)。
这是您使用 DEALER 的代理版本:
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:8888")
socket_req = context.socket(zmq.DEALER)
socket_req.connect("tcp://localhost:5556")
while True:
clientid, message = socket.recv_multipart()
print("id: %r" % clientid)
print("request: %s" % message.decode('utf8'))
socket_req.send(message)
reply = socket_req.recv()
print("reply: %s" % reply.decode('utf8'))
socket.send_multipart([clientid, reply])
还有你的服务器,使用路由器:
import zmq
import time
import sys
if __name__ == '__main__':
port = 5556
if len(sys.argv) > 1:
port = int(sys.argv[1])
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
socket.bind("tcp://127.0.0.1:%i" % port)
while True:
message = socket.recv_multipart()
req_id = message[0]
print("Received request: %s" % message[1:])
time.sleep(1)
socket.send_multipart([req_id, "world from %s" % port])
我想在非 ZMQ 套接字和 ZMQ 套接字之间编写一个桥接器。
客户代码:
import socket
if __name__ == '__main__':
HOST = "localhost"
PORT = 8888
BUFFER = 4096
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print sock
ret = sock.connect((HOST, PORT))
print ret
ret = sock.send('hello, tcpServer!')
print ret
recv = sock.recv(BUFFER)
print ('[tcpServer siad]: %s' % recv)
sock.close()
except e:
print e
代理代码,使用此代理向ZMQ_REP服务器发送请求。
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:8888")
socket_req = context.socket(zmq.REQ)
socket_req.connect("tcp://localhost:5556")
while True:
clientid, message = socket.recv_multipart();
print("id: %r" % clientid)
print("request:",message.decode('utf8'))
socket_req.send(clientid, flags=zmq.SNDMORE, copy=False)
socket_req.send("Hi", copy=False)
clientid, message = socket_req.recv_multipart()
print("id: %r" % clientid)
print("request:",message.decode('utf8'))
ZMQ_REP 服务器代码:
import zmq
import time
import sys
if __name__ == '__main__':
port = '5556'
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)
while True:
message = socket.recv()
print "Received request: ", message
time.sleep(1)
socket.send("world from %s" % port)
REQ 获取错误:
Received request: k
Traceback (most recent call last):
File "req_server.py", line 21, in <module>
socket.send("world from %s" % port)
File "zmq/backend/cython/socket.pyx", line 574, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5434)
File "zmq/backend/cython/socket.pyx", line 621, in zmq.backend.cython.socket.Socket.send (zmq/backend/cython/socket.c:5196)
File "zmq/backend/cython/socket.pyx", line 181, in zmq.backend.cython.socket._send_copy (zmq/backend/cython/socket.c:2035)
File "zmq/backend/cython/checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6248)
zmq.error.ZMQError: Operation cannot be accomplished in current state
socket_req.send(clientid, flags=zmq.SNDMORE, copy=False)
socket_req.send("Hi", copy=False)
最好的猜测是它没有正确注册 SNDMORE 标志并尝试发送一个全新的请求而不是附加到第一个请求(因此打破了 REQ 套接字的严格 SEND/RECEIVE 顺序)...因此套接字的 "current state" 不允许它发送消息的第二部分。尝试使用 send_multipart()
,或验证您的参数是否正确传递。
第一点:一般不建议在zmq中使用REQ/REP。使用更通用的 DEALER/ROUTER 组合。唯一的区别:
- 当 ROUTER 收到消息时,路由 ID 是消息的第一部分。这用于将回复路由回发件人。
- 锁步 req/rep/req/rep 序列未强制执行(这是您看到的错误)。
这是您使用 DEALER 的代理版本:
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:8888")
socket_req = context.socket(zmq.DEALER)
socket_req.connect("tcp://localhost:5556")
while True:
clientid, message = socket.recv_multipart()
print("id: %r" % clientid)
print("request: %s" % message.decode('utf8'))
socket_req.send(message)
reply = socket_req.recv()
print("reply: %s" % reply.decode('utf8'))
socket.send_multipart([clientid, reply])
还有你的服务器,使用路由器:
import zmq
import time
import sys
if __name__ == '__main__':
port = 5556
if len(sys.argv) > 1:
port = int(sys.argv[1])
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
socket.bind("tcp://127.0.0.1:%i" % port)
while True:
message = socket.recv_multipart()
req_id = message[0]
print("Received request: %s" % message[1:])
time.sleep(1)
socket.send_multipart([req_id, "world from %s" % port])