ZeroMQ 没有收到客户端程序发送的值

ZeroMQ not receiving a value sent by client program

我创建了两个程序来使用 ZeroMQ 发送和接收视频源。但是,接收程序总是卡在 .recv() 方法上。

我为这个程序使用了两个ZeroMQ库:一个是原生的zmq,另一个是派生的imagezmq imagezmq 用于发送和接收来自视频的帧数据,而本机 zmq 库用于发送和接收图像发送时的时间。

imagezmq 部分工作正常。

程序只卡在 zmq 部分。

以下是我的两个程序:

FinalCam.py

import struct
import time
import imutils
import imagezmq
import cv2
import zmq    
import socket
import pickle

#                                         # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')

hostName = socket.gethostname()           # send RPi hostname with each image

vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir)           # init the camera

context = zmq.Context()                   # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")

while True:                               # send images as stream until Ctrl-C

    ret, frame = cap.read()
    frame = imutils.resize(frame, width=400)  # resize without compressionq

    captureTime = time.time()
    sender.send_image(hostName, frame)
    print (captureTime)
    captureTime = struct.pack('d', captureTime)
    #msg = pickle.dumps(captureTime)
    print("message primed")
    socket.send(captureTime)
    print("time sent")

生成此输出:

1591824603.5772414
message primed
time sent

FinalRecieve.py

import cv2
import imagezmq
import time
import zmq
import struct

FRAMES = 5
image_hub = imagezmq.ImageHub()           # image socket

context = zmq.Context()                   #  time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")

while True:                               # show streamed images until Ctrl-C

    loopTime = time.time()

    for i in range (0, FRAMES):
        hostName, frame = image_hub.recv_image()
        image_hub.send_reply(b'OK')
        print("recieved image, waiting for time")
        captureTime = socket.recv()
        print("meow")
        print(captureTime)

    finishTime = time.time()
    fpsTime = finishTime - loopTime

    fps = FRAMES / fpsTime
    print(fps)

生成此输出:

received image, waiting for time

这里有几件事可以尝试让原生 zmq 部分正常工作:

SUB-套接字使用.connect()-方法: socket.connect("tcp://localhost:6666")

.bind()-方法用于您的 PUB-套接字: socket.bind("tcp://*:6666")

已解释 here in the guide 应该使用 connect 从套接字创建传出连接。

.bind() 的同级文档中解释说它用于接受连接。

也尝试设置套接字选项:socket.setsockopt(zmq.SUBSCRIBE, "")

据描述 here in the guide SUB-sockets 最初会过滤掉所有消息,因此这可以解释为什么您没有收到任何消息。上面的示例提供了一个空过滤器,它接受所有传入消息。

重要的是要注意,对于基于 PUBSUB 的分发,由于其连接时间或网络条件,SUB 可能不一定会收到消息。例如。在订阅者连接之前从发布者发送的所有内容都是不可接收的

问题定义:

"The imagezmq part works fine. The program only gets stuck on the zmq part. "


解决方案?

ZeroMQ 非常智能(自 v2.0+ 起),完全不需要任何人严格决定是否 { .bind() | .connect() } 并且一个 AccessPoint 可以自由地 .bind() 一些 .connect() 其他 TransportClass-1:N 通信拓扑的通道(换句话说,“反向”.bind()/.connect()总是 可能)。

ImageZMQ-衍生模块并非如此。在您的控制域后面有一组硬连线选择(以及任何硬核劫持 Class-内部属性 { ImageHub.zmq_socket | ImageSender.zmq_socket } 几乎不是 ImageZMQ 作者 ( architecture-(co)-authors ) ) 的梦想。

鉴于 imagezmq 已发布的内部机制本身是预先决定的硬编码选择(相同的 Class 实例,取决于模式,有时 .bind().connect() 否则) 被宣布为有效,让我们专注于使用它(已发布)的最大值。

基于上述内容,编写您的工作部分,以便将时间发送到工作方案“内部”:

PAYLOAD_MASK = "{0:}|||{1:}"
...
while ...
      sender.send_image( PAYLOAD_MASK.format( time.time(), hostName ),
                         frame
                         )

并且可以在.recv()端轻松解码,无需修复使用过的struct.pack()问题(在,在 ZeroMQ 互连的世界中,人们永远不知道远程平台是什么,字节顺序( Endian-convention )将被假定为“那里”,所以struct.pack() 应始终在 format 字符串中显式声明 Endian 类型。始终。如果不确定,可以自行设计- 检测消息,这可能会挽救这种天真的使用并测试/调整你的 format-string 用于 .unpack()-method 的本地使用对于任何一种情况任何隐式盲目 .pack()-sender ... 超出了此 post 的范围,但所有 RPi / 非 RPi 平台项目都倾向于此(仅)-假设-“相同”-Endian 警告)


惊喜:

如果使用 pyzmq 版本 18..,它未在 ImageZMQ 的兼容性列表中列出并且用例注定会崩溃或导致隐患

所以应该从 zmq.pyzmq_version() 检查开始(配置管理中针对受控环境的专业策略,不是吗?)并捕获并解决任何非-匹配案例。


为什么 zmq 部分不起作用?

很难说。没有完整的代码,人们可能只是猜测。 zmq.LINGER 未明确设置为零时,我的候选人会出现错误/丢失终止,这会导致挂断 Context() - 阻止实际资源直到重新启动的实例。正确使用 ZeroMQ 工具反映了这些防御性编程方法(LINGERCONFLATEMAXMSGSIZEIMMEDIATE、白名单和许多其他防御性 .setsockopt()Context()-参数化选项 ),因为 很复杂,可能会在您的控制域或视线之外的许多地方发生故障。

因此,如果您的架构系统要变得健壮和自我修复,请毫不犹豫地成为一名防御性编程设计师。

aLocalCONTEXT = zmq.Context( nIOthreads )
aSUBsocket = aLocalCONTEXT.socket( zmq.SUB )
try: 
      aSUBsocket.bind( "tcp://*:6666" )
except:
      ...
aSUBsocket.setsockopt( zmq.LINGER,      0 )
aSUBsocket.setsockopt( zmq.SUBSCRIBE, b"" )
aSUBsocket.setsockopt( zmq.MAXMSGSIZE, ...)
...

最好重新阅读有关调整其他 ISO-OSI-L2/L3 相关参数以获得最佳性能和最安全 策略的本机 ZeroMQ API 文档。值得花时间为每个新的 API 更新这样做,自 v2.1+...

以来一直如此

这个的反例?

看看 .send_reply() method 如何不受其自身 ImageHub[ 的非 REQ-REP 实例的崩溃保护=116=] Class.

为什么?

到目前为止,对 .send_reply() 方法的非保护调用将在对任何 ImageHub-class 实例进行任何此类调用时使应用程序崩溃,该实例是在其中初始化的-默认REQ_REP = False(即在PUB/SUB模式的SUB端可用)。

aNonDefaultImageHUB_instance.send( "this will crash" )
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ms/anaconda2/lib/python2.7/site-packages/zmq/sugar/socket.py", line 395, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported