使用 ZEROMQ 和 NORM 向大量 WiFI 设备组播数据包

Using ZEROMQ and NORM to multicast packets to a large number of WiFI devices

我正在尝试使用 ZeroMQ 和 NORM 协议通过无线网络发送文件。我目前正在使用 PUB/SUB 模式,因为据我所知,这是 NORM 和 ZeroMQ 支持的唯一模式。

我已将其设置为可以很好地传递小消息,但偶尔收件人收不到消息。从那时起,消息就被丢弃了。偶尔可以通过重新启动发布服务器或订阅服务器来解决这个问题,但不是每次都可以。我试过调整发送位的大小和每次调用发送之间的时间,但无济于事。在连接变得不稳定之前,我似乎可以收到大约 20-60 条多播消息。如果我使用相同的代码但使用 TCP 进行设置,则连接会更加可靠,在错误发生之前大约有数千条消息。

我已经尝试实施包装器 class 以在一段时间不活动后重新启动订阅者 - 但没有用。在 while 循环中设置 socket.recv(zmq.NOBLOCK) 也不行。

我知道此处描述的 Pub-Sub 同步模式,http://zguide.zeromq.org/page:all#Node-Coordination, but NORM, as implemented in the ZeroMQ's norm_engine.cpp (https://github.com/zeromq/libzmq/blob/master/src/norm_engine.cpp) 似乎没有设置为允许此模式。

有没有办法重新发送丢失的数据包,或者确保健康的多播连接?

代码是Python。

出版商:

import zmq
import time
import os
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("norm://224.0.0.1:3000")
i = 1

imgfile_path = "/home/adam/programs/zmq/tux.svg.png"
imgsize = os.stat(imgfile_path).st_size
print "attempting to send", imgsize, "bytes"

sleep_time = 1
topic = ""
packet_size = 500
left = packet_size
f = open(imgfile_path, 'rb')
fi = f.read(packet_size)
while (imgsize - left) > packet_size:
    print "sent packet number:", i
    print "size: ", len(topic + str(i)[-1] + fi)
    i += 1
    socket.send(topic + str(i)[-1] + fi)

    fi = f.read(packet_size)
    left += packet_size
    time.sleep(sleep_time)
print imgsize, left
time.sleep(sleep_time)
fi = f.read(imgsize - left)
print fi
socket.send(topic + " " + fi)
f.close()

订阅者:

import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("norm://224.0.0.2:3000")
socket.setsockopt(zmq.SUBSCRIBE, "")

imgdir = "/home/adam/programs/zmq/img/"
filename = "tux.svg.png"
destfile = imgdir + filename
packet_size = 501
print "attempting to receive"
f = open(destfile, 'wb')
while True:
    msg = None
    while msg is None:
        try:
            msg = socket.recv(zmq.NOBLOCK)
        except:
            pass
    if msg: 
        print "msg = ", msg[0]
        print "we got something", len(msg)
        f.write(msg[1:])
        if len(msg) < packet_size:
            break
f.close()
print "exiting..."

此外,一旦我可以确保我可以发送文件,我想调整前向纠错和NACK率,这就是NORM对我如此有用的原因。有没有办法不用重写就可以做到这一点 norm_engine.cpp?

谢谢!

文档页面 here 证实了您假设的某些内容。也就是说,目前只有 PUB/SUB,尽管您确实可以通过对 PUB/SUB 使用 NORM 和对 REQ/REP 使用 TCP 来进行 link 的同步.

否则,我相信您在 NORM 实施阶段还处于相当早的阶段,因为(也来自我的 link)它谈到了所有尚未完成的事情。那是一年前的事了,但从那时起我就没有看到太多人谈论它了。

根据您的基础设施的具体情况,多播甚至可能不是最佳选择,正如所讨论的那样 here,none 多播协议确实是为当今的高性能网络速度而设计的。可能是您的实施成为这些传输协议自然结果出现的问题恢复不佳的受害者。

编辑:

根据您 post 中的 github link,代码自 2014 年 3 月 19 日以来一直没有更新,与我 link 页面的日期相同上面的 ed 已经发布,所以它所说的关于 ZMQ 中的 NORM 传输的任何内容都应该是最新的。

在页面底部,关于 ZMQ NORM 实现可以做什么和不能做什么的要点:

  1. 当前的 "norm_engine" 采用了一种相当具体的 NORM 传输选项形式。可能需要通过 ZeroMQ API 公开额外的 NORM 特性和传输模式。一些例子包括:
    • NORM 能够为应用程序提供类似 UDP 的 "best effort" 和 "better than best effort"(使用数据包擦除编码)交付服务。这将包括对 NORM FEC 编码参数的控制。

因此,看起来它(在其当前状态下)没有让您对前向纠错进行任何控制,只是在其默认反应(高性能)状态下使用它。默认情况下,应该 为您的连接提供可靠性,但如果 ZMQ 的实现没有彻底完成它的步伐,它可能比您想要的更脆弱。由于缺少内容和补丁,我认为它还没有为您准备好。