ZMQ_CONFLATE 套接字选项导致没有收到消息

ZMQ_CONFLATE socket option cause no messages to be received

我正在尝试找出启用 ZMQ_CONFLATE 选项导致收不到消息的原因。

我重新创建了这个最小的测试用例(我的应用程序使用 XPUB/XSUB 代理,但是,这似乎并没有改变这个测试的结果):

#include <atomic>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <zmq.hpp>

#define USE_PROXY

std::atomic<bool> stop{false};

void pub_thread(zmq::context_t &context)
{
    zmq::socket_t pub(context, zmq::socket_type::pub);
#ifdef USE_PROXY
    pub.connect("tcp://localhost:38922");
#else
    pub.bind("tcp://*:38923");
#endif
    long i = 0;
    for(;;)
    {
        if(stop) break;
        std::string m = boost::lexical_cast<std::string>(i);
        zmq::message_t hdr(6);
        memcpy(hdr.data(), "topic1", 6);
        zmq::message_t msg(m.size());
        memcpy(msg.data(), m.data(), m.size());
        std::cout << "send: " << m << std::endl;
        if(!pub.send(hdr, ZMQ_SNDMORE) || !pub.send(msg))
            std::cout << "send error" << std::endl;
        i++;
        boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
    }
}

void sub_thread(zmq::context_t &context)
{
    zmq::socket_t sub(context, zmq::socket_type::sub);
    const int v_true = 1;
    sub.setsockopt(ZMQ_CONFLATE, &v_true, sizeof(v_true));
#ifdef USE_PROXY
    sub.connect("tcp://localhost:38921");
#else
    sub.connect("tcp://localhost:38923");
#endif
    sub.setsockopt(ZMQ_SUBSCRIBE, "topic1", 6);
    for(;;)
    {
        if(stop) break;
        zmq::message_t hdr, msg;
        if(!sub.recv(&hdr) || !hdr.more() || !sub.recv(&msg))
            std::cout << "recv error" << std::endl;
        std::string m(reinterpret_cast<const char*>(msg.data()), msg.size());
        std::cout << "                recv: " << m << std::endl;
        boost::this_thread::sleep_for(boost::chrono::milliseconds{250});
    }
}

void proxy_thread(zmq::context_t &context)
{
#ifdef USE_PROXY
    zmq::socket_t xpub(context, zmq::socket_type::xpub);
    xpub.bind("tcp://*:38921");
    zmq::socket_t xsub(context, zmq::socket_type::xsub);
    xsub.bind("tcp://*:38922");
    std::cout << "starting xpub/xsub proxy" << std::endl;
    zmq::proxy(xpub, xsub, nullptr);
    std::cout << "xpub/xsub proxy terminated" << std::endl;
#endif
}

void timeout_thread()
{
    boost::this_thread::sleep_for(boost::chrono::seconds{4});
    stop = true;
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    exit(0);
}

int main(int argc, char **argv)
{
    zmq::context_t context(1);
    boost::thread t0(&timeout_thread);
    boost::thread t1(&proxy_thread, boost::ref(context));
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    boost::thread t2(&sub_thread, boost::ref(context));
    boost::this_thread::sleep_for(boost::chrono::seconds{1});
    boost::thread t3(&pub_thread, boost::ref(context));
    t0.join();
}

快速描述:我们有 4 个线程:

我观察到的输出如下:

starting xpub/xsub proxy
send: 0
send: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87

即没有收到任何消息。

预期的输出应该是这样的:

starting xpub/xsub proxy
send: 0
send: 1
                recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
                recv: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
                recv: 21
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
                recv: 33
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
                recv: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
                recv: 55
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
                recv: 66
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
                recv: 77
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87

我也试过在 sub.connect(... 之后移动 sub.setsockopt(ZMQ_CONFLATE,... 但在那种情况下它没有效果,与删除 ZMQ_CONFLATE 行相同的效果:

starting xpub/xsub proxy
send: 0
send: 1
                recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
                recv: 2
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
                recv: 3
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
                recv: 4
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
                recv: 5
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
                recv: 6
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
                recv: 7
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
                recv: 8
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87

ZMQ版本:4.2.5

在您订阅的套接字中,尝试在连接之前设置 ZMQ_CONFLATE

来自文档:

http://api.zeromq.org/4-2:zmq-setsockopt

Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE, ZMQ_XPUB_VERBOSER, ZMQ_REQ_CORRELATE, ZMQ_REQ_RELAXED, ZMQ_SNDHWM and ZMQ_RCVHWM, only take effect for subsequent socket bind/connects.

很好的示例代码,很高兴看到您的预期结果。

您正在使用与 ZMQ_CONFLATE

不兼容的多部分消息

ZMQ_CONFLATE: Keep only last message If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RECVHWM and ZMQ_SENDHWM options. Does not supports multi-part messages, in particular, only one part of it is kept in the socket internal queue.

如果您更改代码以发送 一个消息部分 (整数)并订阅所有 sub.setsockopt(ZMQ_SUBSCRIBE, "", 0); 代码会产生您预期的结果。