PYZMQ REQ-REP 多个客户端 ZMQError 轮询
PYZMQ REQ-REP Multiple Clients ZMQError with Polling
使用 REQ-REP 模式,我尝试使用轮询超时请求多个 clients
,这样如果 server
检测到它无法接收来自第一个 client
的消息,它将超时并转到下一个 client
。
但似乎在初始超时后,无法将下一条消息发送到第二个client
。
我在 server
的 socket.send_string("Sensor Data")
行收到此错误 zmq.error.ZMQError: Operation cannot be accomplished in current state
。
满输出:
Connecting to machine...
Successfully connected to machine 127.0.0.1:9999
Successfully connected to machine 127.0.0.1:9998
Sending request 0 ...
Machine did not respond
Sending request 1 ...
Traceback (most recent call last):
File "C:\Users\tobiw\Documents\Python\Raspberry Pi\zmq\REQrep\testSIMPLEpoll.py", line 16, in <module>
socket.send_string("Sensor Data")
File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 541, in send_string
return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 384, in send
return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 771, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 249, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/socket.pyx", line 244, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
raise ZMQError(errno)
zmq.error.ZMQError: Operation cannot be accomplished in current state
[Finished in 5.3s with exit code 1]
服务器:
import zmq
import json
ports = ["127.0.0.1:9999", "127.0.0.1:9998"]
context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
socket.connect("tcp://%s" % port)
print("Successfully connected to machine %s" % port)
for request in range(len(ports)):
print("Sending request ", request, "...")
socket.send_string("Sensor Data") # <-- error occurs here
# use poll for timeouts:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
socks = dict(poller.poll(5 * 1000))
if socket in socks:
try:
msg_json = socket.recv()
sens = json.loads(msg_json)
response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
print("Received reply ", request, "[", response, "]")
except IOError:
print("Could not connect to machine")
else:
print("Machine did not respond")
客户:
import zmq
import time
import json
port = "9998" # multiple similar clients but just with different ports
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)
while True:
# Wait for next request from server
message = str(socket.recv(), "utf-8")
print("Received request: ", message)
time.sleep(1)
msgDict = {
'sensor': "6",
'data': "123456789",
'client': "9876",
}
msg_json = json.dumps(msgDict)
socket.send_string(msg_json)
如果 server
能够从第一个 client
接收消息,第二个 send
到第二个 client
将正常工作,但是如果 server
无法 接收到来自第一个 client
的消息,然后错误重现。
最重要的是,REQ-REP 模式中的 zmq.error.ZMQError: Operation cannot be accomplished in current state
表明 send -> recv -> send -> recv
的顺序不正确。对于我的情况,由于 for-loop
中的接收轮询,REQ server 端没有最终 recv
超时。当方法返回时,它再次进入 send
,结果是 send -> recv -> send -> timeout -> send
。双 send
场景是非法的。
我做了什么来纠正它:
我从 REQ-REP 模式切换到 DEALER-REP 模式。这给了我一个可以与多个 REP 客户端通信的异步服务器。
在 client
保持不变的情况下,这是新的 server
对于那些感兴趣的人:
服务器:
import zmq
import json
ports = ["127.0.0.1:9999", "127.0.0.1:9998"]
context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
socket.connect("tcp://%s" % port)
print("Successfully connected to machine %s" % port)
for request in range(len(ports)):
print("Sending request ", request, "...")
socket.send_string("", zmq.SNDMORE) # delimiter
socket.send_string("Sensor Data") # actual message
# use poll for timeouts:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
socks = dict(poller.poll(5 * 1000))
if socket in socks:
try:
socket.recv() # discard delimiter
msg_json = socket.recv() # actual message
sens = json.loads(msg_json)
response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
print("Received reply ", request, "[", response, "]")
except IOError:
print("Could not connect to machine")
else:
print("Machine did not respond")
使用 REQ-REP 模式,我尝试使用轮询超时请求多个 clients
,这样如果 server
检测到它无法接收来自第一个 client
的消息,它将超时并转到下一个 client
。
但似乎在初始超时后,无法将下一条消息发送到第二个client
。
我在 server
的 socket.send_string("Sensor Data")
行收到此错误 zmq.error.ZMQError: Operation cannot be accomplished in current state
。
满输出:
Connecting to machine...
Successfully connected to machine 127.0.0.1:9999
Successfully connected to machine 127.0.0.1:9998
Sending request 0 ...
Machine did not respond
Sending request 1 ...
Traceback (most recent call last):
File "C:\Users\tobiw\Documents\Python\Raspberry Pi\zmq\REQrep\testSIMPLEpoll.py", line 16, in <module>
socket.send_string("Sensor Data")
File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 541, in send_string
return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 384, in send
return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 771, in zmq.backend.cython.socket.Socket.send
File "zmq/backend/cython/socket.pyx", line 249, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/socket.pyx", line 244, in zmq.backend.cython.socket._send_copy
File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
raise ZMQError(errno)
zmq.error.ZMQError: Operation cannot be accomplished in current state
[Finished in 5.3s with exit code 1]
服务器:
import zmq
import json
ports = ["127.0.0.1:9999", "127.0.0.1:9998"]
context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
socket.connect("tcp://%s" % port)
print("Successfully connected to machine %s" % port)
for request in range(len(ports)):
print("Sending request ", request, "...")
socket.send_string("Sensor Data") # <-- error occurs here
# use poll for timeouts:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
socks = dict(poller.poll(5 * 1000))
if socket in socks:
try:
msg_json = socket.recv()
sens = json.loads(msg_json)
response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
print("Received reply ", request, "[", response, "]")
except IOError:
print("Could not connect to machine")
else:
print("Machine did not respond")
客户:
import zmq
import time
import json
port = "9998" # multiple similar clients but just with different ports
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)
while True:
# Wait for next request from server
message = str(socket.recv(), "utf-8")
print("Received request: ", message)
time.sleep(1)
msgDict = {
'sensor': "6",
'data': "123456789",
'client': "9876",
}
msg_json = json.dumps(msgDict)
socket.send_string(msg_json)
如果 server
能够从第一个 client
接收消息,第二个 send
到第二个 client
将正常工作,但是如果 server
无法 接收到来自第一个 client
的消息,然后错误重现。
最重要的是,REQ-REP 模式中的 zmq.error.ZMQError: Operation cannot be accomplished in current state
表明 send -> recv -> send -> recv
的顺序不正确。对于我的情况,由于 for-loop
中的接收轮询,REQ server 端没有最终 recv
超时。当方法返回时,它再次进入 send
,结果是 send -> recv -> send -> timeout -> send
。双 send
场景是非法的。
我做了什么来纠正它: 我从 REQ-REP 模式切换到 DEALER-REP 模式。这给了我一个可以与多个 REP 客户端通信的异步服务器。
在 client
保持不变的情况下,这是新的 server
对于那些感兴趣的人:
服务器:
import zmq
import json
ports = ["127.0.0.1:9999", "127.0.0.1:9998"]
context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
socket.connect("tcp://%s" % port)
print("Successfully connected to machine %s" % port)
for request in range(len(ports)):
print("Sending request ", request, "...")
socket.send_string("", zmq.SNDMORE) # delimiter
socket.send_string("Sensor Data") # actual message
# use poll for timeouts:
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
socks = dict(poller.poll(5 * 1000))
if socket in socks:
try:
socket.recv() # discard delimiter
msg_json = socket.recv() # actual message
sens = json.loads(msg_json)
response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
print("Received reply ", request, "[", response, "]")
except IOError:
print("Could not connect to machine")
else:
print("Machine did not respond")