向服务器 pyzmq 发送数据时出现问题

Issue in sending data to the server pyzmq

我有以下代码使用请求和回复模式将数组从服务器发送到客户端,

def send_array( socket, A, flags = 0, copy = True, track = False ):
 """send a numpy array with metadata"""
 md = dict( dtype = str( A.dtype ),
           shape = A.shape,)
 socket.send_json( md, flags | zmq.SNDMORE )
 return socket.send(A,  flags, copy = copy, track = track )

 context = zmq.Context()
 socket = context.socket(zmq.REQ)
 socket.connect("tcp://localhost:5667")
 videoFile = "C:/Users/Downloads/test.mp4"
 camera = cv2.VideoCapture(videoFile)
 while True:
   grabbed, frame = camera.read()
  try:
   frame = cv2.resize( frame, (224, 224) ).astype( "float32" )
  except cv2.error:
  break
  image= img_to_array(frame)
  image=image.reshape((1,image.shape[0],image.shape[1],image.shape[2]))
  image=preprocess_input(image)
  preds=model.predict(image)
  send_array(socket, preds)
 socket.close()

def recv_array( socket, flags = 0, copy = True, track = False ):
  md = socket.recv_json( flags = flags )
  msg = socket.recv(flags = flags, copy = copy, track = track )
  #buf = buffer( msg )
  #pass;
  #img = np.frombuffer(bytes(memoryview(msg)), dtype=md['dtype'])
  A = numpy.frombuffer( bytes(memoryview(msg)), dtype = md['dtype'] )
  return A.reshape(md['shape'])

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5667")
time.sleep(0.2)

while True:
  frame  = recv_array(socket)
  print(frame)
socket.close()

我没有检索到任何错误,但没有数据从客户端发送到服务器。谢谢,非常感谢您的帮助。

欢迎来到零之禅 的土地。

使用 ZeroMQ REQ/REP 可扩展的正式通信模式原型,必须满足其记录的属性,否则工作将无法完成。

非常REQ/REP 原型被记录为使用并遵循全状态分布式有限状态自动机(dFSA ),其中 REQ.send()-询问,下一个 REP.recv()-s 并且必须 REP.send()-answer,必须是 REQ.recv()-ed,在任何 REQ 可以做出之前任何下一个 .send() 并且此 dFSA 循环无限重复(或直到出现无法挽救的相互死锁 - 有关更多详细信息,请参见 this and this)。

因此,您的代码永远不会忽略保持 REQ 端的两步舞:
.send()-.recv()-.send()-.recv()-... 并且,
对称地
一个REP端.recv()-.send()-.recv()-.send()-...

不这样做会挂起所述 dFSA 的全状态转换流程,并且代码会留下 returning EFSM-error任何此类调用 { .recv() | .send() } 方法的状态,与压缩在所述 dFSA 中的 REQ/REP 常规序列不对齐。

最后但并非最不重要的一点是,您的代码原样很脆弱且容易崩溃,从不测试 return 值,因此没有自适应处理警告,例如 multipart-message 没有被读取直到最后(或错过下一个,只是假设并盲目相信,多部分消息部分)。这将以类似但同样痛苦的方式使代码崩溃。

比本地纯 [SERIAL] 代码复杂得多。