您如何使用 ZeroMQ 开发一个简单的 DEALER/ROUTER 消息流?

How could you develop a simple DEALER/ROUTER message flow, using ZeroMQ?

我对 TCP 消息传递(和一般编程)相当陌生,我正在尝试使用 ZeroMQ 开发一个简单的 ROUTER/DEALER 消息对,但我正在努力让路由器接收来自经销商的消息并寄回一份。

我可以毫无问题地执行一个简单的 REQ/REP 模式,我可以从我的机器向我的 VM 发送一条消息。

但是,在尝试开发 ROUTER/DEALER 对时,我似乎无法获得 ROUTER-实例来接收消息(ROUTER 在 VM 上,DEALER 在主机上)。我取得了一些成功,我可以在 while(){...} 循环中发送 50 条垃圾邮件,但无法发送一条消息并让 ROUTER 发回一条。

所以据我所知,ROUTER/DEALER 对中的 TCP 消息在开始时以 0 分隔符发送,并且必须先将此 0 发送到 ROUTER 进行注册收到的消息。

所以我只想将消息 "ROUTER_TEST" 发送到我的服务器,并让我的服务器响应 "RECEIVED".

经销商

#include <cstdlib>
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>

#include "zmq.h"

const char connection[] = "tcp://10.0.10.76:5555";
int main(void)
{
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);
    printf("\nInstalled ZeroMQ version: %d.%d.%d\n", major, minor, patch);
    printf("Connecting to: %s\n", connection);

    void *context = zmq_ctx_new();

    void *requester = zmq_socket(context, ZMQ_DEALER);

    int zc = zmq_connect(requester, connection); 
    std::cout << "zmq_connect = " << zc << std::endl;

    int sm = zmq_socket_monitor(requester, connection, ZMQ_EVENT_ALL);
    std::cout << "zmq_socket_monitor = " << sm << std::endl;

    char messageSend[] = "ROUTER_TEST";

    int request_nbr;
    int n = zmq_send(requester, NULL, 0, ZMQ_DONTWAIT|ZMQ_SNDMORE );
    int ii = 0;
    if(n==0) {
        std::cout << "n = " << n << std::endl;
    while (ii < 50)
    {
        n = zmq_send(requester, messageSend, 31, ZMQ_DONTWAIT);

        ii++;
    }
    }

    return 0;
}

路由器

// SERVER
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>

#include "zmq.h"

int main(void)
{
    void *context = zmq_ctx_new();
    void *responder = zmq_socket(context, ZMQ_ROUTER);
    printf("THIS IS WORKING - ROUTER\n");
    int rc = zmq_bind(responder, "tcp://*:5555");
    assert(rc == 0);

    zmq_pollitem_t pollItems[] = {
        {responder, 0, ZMQ_POLLIN, -1}};

    int sm = zmq_socket_monitor(responder, "tcp://*:5555", ZMQ_EVENT_LISTENING);
    std::cout << "zmq_socket_monitor = " << sm << std::endl;
    uint8_t buffer[15];
    while (1)
    {
        int rc = zmq_recv(responder, buffer, 5, ZMQ_DONTWAIT);
        if (rc == 0)
        {
            std::cout << "zmq_recv = " << rc << std::endl;
            zmq_send(responder, "RECIEVED", 9,0);
        }

        zmq_poll(pollItems, sizeof(pollItems), -1);

    }
    return 0;
}

欢迎来到零之禅。

如果你从未使用过 ZeroMQ,
你可能会喜欢先看看 ,然后再深入了解更多细节


您的代码在 DEALER 端调用了一系列:

void *requester = zmq_socket( context,
                              ZMQ_DEALER           // <--   .STO <ZMQ_DEALER>, *requester
                              );
...
int n   = zmq_send( requester, // <~~ <~~ <~~ <~~ <~~ <~~   .STO 0, n
                    NULL,     //                             NULL,sizeof(NULL)== 0
                    0,       //                              explicitly declared 0
                    ZMQ_DONTWAIT                   //           _DONTWAIT flag
                  | ZMQ_SNDMORE                    //---- 1x ZMQ_SNDMORE  flag== 
                    );                             //        1.Frame in 1st MSG
int ii  = 0;                                       //        MSG-under-CONSTRUCTION
if ( n == 0 )                                      //     ...until complete, not yet sent
{    
     std::cout << "PREVIOUS[" << ii << ".] CALL of zmq_send() has returned n = " << n << std::endl;

     while ( ii < 50 )
     {       ii++;
             n = zmq_send( requester,   //---------//---- 1x ZMQ_SNDMORE following
                           messageSend, //         //        2.Frame in 1st MSG
                           31,          //         //        MSG-under-CONSTRUCTION, but
                           ZMQ_DONTWAIT //         //        NOW complete & will get sent
                           );           //---------//----49x monoFrame MSGs follow
     }
}
...

对面发生了什么,ROUTER端代码?

...
while (1)
{       
        int  rc  = zmq_recv( responder, //----------------- 1st .recv()
                             buffer,
                             5,
                             ZMQ_DONTWAIT
                             );
        if ( rc == 0 )
        {
            std::cout << "zmq_recv = " << rc << std::endl;
            zmq_send( responder,  // _____________________ void  *socket
                      "RECEIVED", // _____________________ void  *buffer
                      9,  // _____________________________ size_t len
                      0   // _____________________________ int    flags
                      );
        }
        zmq_poll( pollItems,
                  sizeof( pollItems ),
                  -1 // __________________________________ block ( forever )
                  );//                                     till  ( if ever ) ...?
}

在这里,最有可能的是,rc == 0但是一旦没有错过,就再也不会了

请注意,您的代码不会以任何方式检测 .recv() 调用是否也被 ZMQ_RECVMORE 标记 - 发出信号在能够 .send()-any-answer...

之前,还需要首先 .recv()-第一条消息的所有其余多帧部分

An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_recv() to determine if there are further parts to receive.

接下来,buffermessageSend消息-"payloads"是一种脆弱的实体,应该重新组合(有关详细信息,最好再次阅读有关如何仔细初始化、使用和安全触摸任何 zmq_msg_t 对象的所有详细信息),在成功 .send()/.recv() 之后,低级别 API (自 2.11.x+ 起)认为它们已被丢弃,不可重复使用。另请注意,messageSend 不是 而不是 (如强制放入代码中那样)是 31-char[]-长,是吗?这样做有什么特别的意图吗?

The zmq_send() function shall return number of bytes in the message if successful. Otherwise it shall return -1 and set errno to one of the values defined below. { EAGAIN, ENOTSUP, EINVAL, EFSM, ETERM, ENOTSOCK, EINTR, EHOSTUNREACH }

不测试错误状态意味着对 REQ/REPDEALER/ROUTER(扩展)的实际状态一无所知(参见 EFSM 和其他潜在的问题解释器).send()/.recv()/.send()/.recv()/...强制性 dFSA 对这些步骤的顺序


"So from what I have read, a TCP message in a ROUTER/DEALER pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to the ROUTER to register an incoming message."

这似乎是一个被误解的部分。应用程序端可以自由组合任意数量的单帧或多帧消息,但是 ROUTER 前置标识帧的 "trick" 是在没有用户协助的情况下执行的(消息标记是自动执行的,在任何(现在,主要是所有)多帧(d)消息被传递到应用程序端(使用接收方.recv()-方法)之前。上面提到了对多帧消息的适当处理。