ZMQ - 数据包丢失

ZMQ - Packet Loss

我计划使用 ZMQ 实现 Pub/Sub 模型,并考虑尝试 ZMQ 示例(天气)程序。 [注意:我对程序做了一些修改]

以下是关于ZMQ的观察:

请找到Pastebin的link


Publisher Sample Code:

//
//  Weather update server in C++
//  Binds PUB socket to tcp://*:5556
//  Publishes random weather updates
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <cppzmq-master/zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <unistd.h>

using namespace std::chrono;

#if (defined (WIN32))
#include <zhelpers.hpp>
#endif

#define within(num) (unsigned long) ((float) num * random () / (RAND_MAX + 1.0))

int main (int argc, char *argv[]) {

    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    unsigned long ticks_count = atoi(argv [1]);
    //publisher.bind("ipc://weather.ipc");                // Not usable on Windows.

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    unsigned long counter = 0;
    zmq_sleep(5);

    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto timestamp3 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();

    for(unsigned long i = 1; i <= ticks_count; i++) {

        unsigned long zipcode, temperature, relhumidity;

        //  Get values that will fool the boss
        zipcode     = ++counter;
        temperature = within (215) - 80;
        relhumidity = within (50) + 10;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
            "%lu %lu %lu", zipcode, temperature, relhumidity);
        publisher.send(message);
        // usleep(0);
        std::cout << "i, zipcode: "<< i << ", " << zipcode << std::endl;

        // Code to print missing packets
        if(i != zipcode)
            std::cout << "Missing Packet - Expected: "<< i << ", Sent: " << zipcode << std::endl;

    }
    now = std::chrono::system_clock::now();
    duration = now.time_since_epoch();
    auto timestamp4 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
    unsigned long diff = timestamp4 - timestamp3;
    std::cout << "Time Taken (nanoseconds): " << diff << std::endl;

    zmq_sleep(300);
    return 0;
}

命令:

构建 =>

g++ pub.cpp -o pub -lstdc++ -lzmq

执行=>

./pub 22686

Subscriber Sample Code:

//
//  Weather update client in C++
//  Connects SUB socket to tcp://localhost:5556
//  Collects weather updates and finds avg temp in zipcode
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <cppzmq-master/zmq.hpp>
#include <iostream>
#include <sstream>

using namespace std::chrono;

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to talk to server
    std::cout << "Collecting updates from weather server…\n" << std::endl;
    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5556");

    //  Subscribe to zipcode, default is NYC, 10001
    const char *filter = "";//(argc > 1)? argv [1]: "10001 ";
    subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
    unsigned long ticks_count = atoi(argv [1]);
    std::cout << "Ticks Count: " << ticks_count << std::endl;
    //  Process 100 updates
    unsigned long update_nbr;
    unsigned long total_temp = 0;

    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto timestamp3 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();

    for (update_nbr = 1; update_nbr <= ticks_count; update_nbr++) {

        zmq::message_t update;
        unsigned long zipcode, temperature, relhumidity;

        subscriber.recv(&update);

        std::istringstream iss(static_cast<char*>(update.data()));
        iss >> zipcode >> temperature >> relhumidity ;

        total_temp += temperature;
        std::cout << "update_nbr, zipcode: "<< update_nbr << ", " << zipcode << std::endl;

        // Code to print missing packets
        if(update_nbr != zipcode)
            std::cout << "Missing Packet - Expected: "<< update_nbr << ", Received: " << zipcode << std::endl;
    }

    now = std::chrono::system_clock::now();
    duration = now.time_since_epoch();
    auto timestamp4 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
    unsigned long diff = timestamp4 - timestamp3;
    std::cout << "Time Taken (nanoseconds): " << diff << std::endl;
    std::cout     << "Program Complete" << std::endl;
    return 0;
}

命令:

构建 =>

g++ sub.cpp -o sub -lstdc++ -lzmq

执行=>

./sub 22686

欢迎来到 Zen-of-Zero :观察到的结果已完整记录

原因:

When a ZMQ_PUB socket enters the mute state due to having reached the high water mark for a subscriber, then any messages that would be sent to the subscriber in question shall instead be dropped until the mute state ends. The zmq_send() function shall never block for this socket type.

解法:

要么包括适当的资源大小,
要么设计双向握手,这将自适应地节流publisher.send(message) 方法的(否则是盲目的)调用的节奏,取决于平滑和同步处理流程的能力。


PUB-边:加

int           hwm_nMSGs = 0;                    //  A value of zero means no limit
unsigned long buf_SizeB = 100 * ticks_count;
publisher.setsockopt( ZMQ_SNDBUF, buf_SizeB, sizeof ( buf_SizeB ) );
publisher.setsockopt( ZMQ_SNDHWM, hwm_nMSGs, sizeof ( hwm_nMSGs ) );
// _________________________________________________________ CONFIG b4 .bind()
publisher.bind( "tcp://*:5556" );
...
// ___________________________________________________________________ .unbind()
zmq_unbind( publisher, "tcp://*:5556" );                            // .unbind()
zmq_ctx_term( context );                                            // .term()

SUB-边:加

unsigned long hwm_nMSGs =   3 * ticks_count;
unsigned long buf_SizeB = 100 * ticks_count;
subscriber.setsockopt( ZMQ_RCVHWM, hwm_nMSGs, sizeof ( hwm_nMSGs ) );
subscriber.setsockopt( ZMQ_RCVBUF, buf_SizeB, sizeof ( buf_SizeB ) );
// _________________________________________________________ CONFIG b4 .connect()
subscriber.connect( "tcp://localhost:5556" );
...
// ___________________________________________________________________ .close()
subscriber.close();                                                 // .close()
zmq_ctx_term( context );                                            // .term()

最后但并非最不重要的一点
永远不要假定网络数据包到达的自然顺序,多路径路由网络可能会传递并确实传递出-无序,所以最后一个条件:if(update_nbr != zipcode) 将标记所有无序交付,即使在 { PUB | SUB } 端由于缺少资源或在传输过程中丢失而没有消息被丢弃。

鉴于没有迹象表明您的代码正在使用或将来会使用哪个 ZeroMQ 版本/语言绑定/包装器,请注意 ZMQ_LINGER 的默认值有所不同,可能会导致您的代码挂起- 如果不是所有消息都已传输,则等待无限长的时间,但没有接收进程从队列中检索此类挂起等待的消息。避免这种情况的最佳步骤是始终明确控制所有 zmq_socket 实例的 ZMQ_LINGER 设置,并通过 .unbind().close().term().

对于健壮和干净的 设计

,这是公平和正当的做法