您如何使用 ZeroMQ 开发一个简单的 DEALER/ROUTER 消息流?
How could you develop a simple DEALER/ROUTER message flow, using ZeroMQ?
我对 TCP 消息传递(和一般编程)相当陌生,我正在尝试使用 ZeroMQ 开发一个简单的 ROUTER/DEALER 消息对,但我正在努力让路由器接收来自经销商的消息并寄回一份。
我可以毫无问题地执行一个简单的 REQ/REP
模式,我可以从我的机器向我的 VM 发送一条消息。
但是,在尝试开发 ROUTER/DEALER
对时,我似乎无法获得 ROUTER
-实例来接收消息(ROUTER
在 VM 上,DEALER
在主机上)。我取得了一些成功,我可以在 while(){...}
循环中发送 50 条垃圾邮件,但无法发送一条消息并让 ROUTER
发回一条。
所以据我所知,ROUTER/DEALER
对中的 TCP 消息在开始时以 0 分隔符发送,并且必须先将此 0 发送到 ROUTER
进行注册收到的消息。
所以我只想将消息 "ROUTER_TEST" 发送到我的服务器,并让我的服务器响应 "RECEIVED".
经销商
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include "zmq.h"
const char connection[] = "tcp://10.0.10.76:5555";
int main(void)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
printf("\nInstalled ZeroMQ version: %d.%d.%d\n", major, minor, patch);
printf("Connecting to: %s\n", connection);
void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_DEALER);
int zc = zmq_connect(requester, connection);
std::cout << "zmq_connect = " << zc << std::endl;
int sm = zmq_socket_monitor(requester, connection, ZMQ_EVENT_ALL);
std::cout << "zmq_socket_monitor = " << sm << std::endl;
char messageSend[] = "ROUTER_TEST";
int request_nbr;
int n = zmq_send(requester, NULL, 0, ZMQ_DONTWAIT|ZMQ_SNDMORE );
int ii = 0;
if(n==0) {
std::cout << "n = " << n << std::endl;
while (ii < 50)
{
n = zmq_send(requester, messageSend, 31, ZMQ_DONTWAIT);
ii++;
}
}
return 0;
}
路由器
// SERVER
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include "zmq.h"
int main(void)
{
void *context = zmq_ctx_new();
void *responder = zmq_socket(context, ZMQ_ROUTER);
printf("THIS IS WORKING - ROUTER\n");
int rc = zmq_bind(responder, "tcp://*:5555");
assert(rc == 0);
zmq_pollitem_t pollItems[] = {
{responder, 0, ZMQ_POLLIN, -1}};
int sm = zmq_socket_monitor(responder, "tcp://*:5555", ZMQ_EVENT_LISTENING);
std::cout << "zmq_socket_monitor = " << sm << std::endl;
uint8_t buffer[15];
while (1)
{
int rc = zmq_recv(responder, buffer, 5, ZMQ_DONTWAIT);
if (rc == 0)
{
std::cout << "zmq_recv = " << rc << std::endl;
zmq_send(responder, "RECIEVED", 9,0);
}
zmq_poll(pollItems, sizeof(pollItems), -1);
}
return 0;
}
欢迎来到零之禅。
如果你从未使用过 ZeroMQ,
你可能会喜欢先看看 ,然后再深入了解更多细节
您的代码在 DEALER
端调用了一系列:
void *requester = zmq_socket( context,
ZMQ_DEALER // <-- .STO <ZMQ_DEALER>, *requester
);
...
int n = zmq_send( requester, // <~~ <~~ <~~ <~~ <~~ <~~ .STO 0, n
NULL, // NULL,sizeof(NULL)== 0
0, // explicitly declared 0
ZMQ_DONTWAIT // _DONTWAIT flag
| ZMQ_SNDMORE //---- 1x ZMQ_SNDMORE flag==
); // 1.Frame in 1st MSG
int ii = 0; // MSG-under-CONSTRUCTION
if ( n == 0 ) // ...until complete, not yet sent
{
std::cout << "PREVIOUS[" << ii << ".] CALL of zmq_send() has returned n = " << n << std::endl;
while ( ii < 50 )
{ ii++;
n = zmq_send( requester, //---------//---- 1x ZMQ_SNDMORE following
messageSend, // // 2.Frame in 1st MSG
31, // // MSG-under-CONSTRUCTION, but
ZMQ_DONTWAIT // // NOW complete & will get sent
); //---------//----49x monoFrame MSGs follow
}
}
...
对面发生了什么,ROUTER
端代码?
...
while (1)
{
int rc = zmq_recv( responder, //----------------- 1st .recv()
buffer,
5,
ZMQ_DONTWAIT
);
if ( rc == 0 )
{
std::cout << "zmq_recv = " << rc << std::endl;
zmq_send( responder, // _____________________ void *socket
"RECEIVED", // _____________________ void *buffer
9, // _____________________________ size_t len
0 // _____________________________ int flags
);
}
zmq_poll( pollItems,
sizeof( pollItems ),
-1 // __________________________________ block ( forever )
);// till ( if ever ) ...?
}
在这里,最有可能的是,rc == 0
但是一旦没有错过,就再也不会了
请注意,您的代码不会以任何方式检测 .recv()
调用是否也被 ZMQ_RECVMORE
标记 - 发出信号在能够 .send()
-any-answer...
之前,还需要首先 .recv()
-第一条消息的所有其余多帧部分
An application that processes multi-part messages must use the ZMQ_RCVMORE
zmq_getsockopt(3)
option after calling zmq_recv()
to determine if there are further parts to receive.
接下来,buffer
和messageSend
消息-"payloads"是一种脆弱的实体,应该重新组合(有关详细信息,最好再次阅读有关如何仔细初始化、使用和安全触摸任何 zmq_msg_t
对象的所有详细信息),在成功 .send()/.recv()
之后,低级别 API (自 2.11.x+ 起)认为它们已被丢弃,不可重复使用。另请注意,messageSend
不是 而不是 (如强制放入代码中那样)是 31-char[]
-长,是吗?这样做有什么特别的意图吗?
The zmq_send()
function shall return number of bytes in the message if successful. Otherwise it shall return -1
and set errno
to one of the values defined below. { EAGAIN, ENOTSUP, EINVAL, EFSM, ETERM, ENOTSOCK, EINTR, EHOSTUNREACH }
不测试错误状态意味着对 REQ/REP
和 DEALER/ROUTER
(扩展)的实际状态一无所知(参见 EFSM
和其他潜在的问题解释器).send()/.recv()/.send()/.recv()/...
强制性 dFSA 对这些步骤的顺序
"So from what I have read, a TCP message in a ROUTER/DEALER
pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to the ROUTER
to register an incoming message."
这似乎是一个被误解的部分。应用程序端可以自由组合任意数量的单帧或多帧消息,但是 ROUTER
前置标识帧的 "trick" 是在没有用户协助的情况下执行的(消息标记是自动执行的,在任何(现在,主要是所有)多帧(d)消息被传递到应用程序端(使用接收方.recv()
-方法)之前。上面提到了对多帧消息的适当处理。
我对 TCP 消息传递(和一般编程)相当陌生,我正在尝试使用 ZeroMQ 开发一个简单的 ROUTER/DEALER 消息对,但我正在努力让路由器接收来自经销商的消息并寄回一份。
我可以毫无问题地执行一个简单的 REQ/REP
模式,我可以从我的机器向我的 VM 发送一条消息。
但是,在尝试开发 ROUTER/DEALER
对时,我似乎无法获得 ROUTER
-实例来接收消息(ROUTER
在 VM 上,DEALER
在主机上)。我取得了一些成功,我可以在 while(){...}
循环中发送 50 条垃圾邮件,但无法发送一条消息并让 ROUTER
发回一条。
所以据我所知,ROUTER/DEALER
对中的 TCP 消息在开始时以 0 分隔符发送,并且必须先将此 0 发送到 ROUTER
进行注册收到的消息。
所以我只想将消息 "ROUTER_TEST" 发送到我的服务器,并让我的服务器响应 "RECEIVED".
经销商
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include "zmq.h"
const char connection[] = "tcp://10.0.10.76:5555";
int main(void)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
printf("\nInstalled ZeroMQ version: %d.%d.%d\n", major, minor, patch);
printf("Connecting to: %s\n", connection);
void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_DEALER);
int zc = zmq_connect(requester, connection);
std::cout << "zmq_connect = " << zc << std::endl;
int sm = zmq_socket_monitor(requester, connection, ZMQ_EVENT_ALL);
std::cout << "zmq_socket_monitor = " << sm << std::endl;
char messageSend[] = "ROUTER_TEST";
int request_nbr;
int n = zmq_send(requester, NULL, 0, ZMQ_DONTWAIT|ZMQ_SNDMORE );
int ii = 0;
if(n==0) {
std::cout << "n = " << n << std::endl;
while (ii < 50)
{
n = zmq_send(requester, messageSend, 31, ZMQ_DONTWAIT);
ii++;
}
}
return 0;
}
路由器
// SERVER
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include "zmq.h"
int main(void)
{
void *context = zmq_ctx_new();
void *responder = zmq_socket(context, ZMQ_ROUTER);
printf("THIS IS WORKING - ROUTER\n");
int rc = zmq_bind(responder, "tcp://*:5555");
assert(rc == 0);
zmq_pollitem_t pollItems[] = {
{responder, 0, ZMQ_POLLIN, -1}};
int sm = zmq_socket_monitor(responder, "tcp://*:5555", ZMQ_EVENT_LISTENING);
std::cout << "zmq_socket_monitor = " << sm << std::endl;
uint8_t buffer[15];
while (1)
{
int rc = zmq_recv(responder, buffer, 5, ZMQ_DONTWAIT);
if (rc == 0)
{
std::cout << "zmq_recv = " << rc << std::endl;
zmq_send(responder, "RECIEVED", 9,0);
}
zmq_poll(pollItems, sizeof(pollItems), -1);
}
return 0;
}
欢迎来到零之禅。
如果你从未使用过 ZeroMQ,
你可能会喜欢先看看
您的代码在 DEALER
端调用了一系列:
void *requester = zmq_socket( context,
ZMQ_DEALER // <-- .STO <ZMQ_DEALER>, *requester
);
...
int n = zmq_send( requester, // <~~ <~~ <~~ <~~ <~~ <~~ .STO 0, n
NULL, // NULL,sizeof(NULL)== 0
0, // explicitly declared 0
ZMQ_DONTWAIT // _DONTWAIT flag
| ZMQ_SNDMORE //---- 1x ZMQ_SNDMORE flag==
); // 1.Frame in 1st MSG
int ii = 0; // MSG-under-CONSTRUCTION
if ( n == 0 ) // ...until complete, not yet sent
{
std::cout << "PREVIOUS[" << ii << ".] CALL of zmq_send() has returned n = " << n << std::endl;
while ( ii < 50 )
{ ii++;
n = zmq_send( requester, //---------//---- 1x ZMQ_SNDMORE following
messageSend, // // 2.Frame in 1st MSG
31, // // MSG-under-CONSTRUCTION, but
ZMQ_DONTWAIT // // NOW complete & will get sent
); //---------//----49x monoFrame MSGs follow
}
}
...
对面发生了什么,ROUTER
端代码?
...
while (1)
{
int rc = zmq_recv( responder, //----------------- 1st .recv()
buffer,
5,
ZMQ_DONTWAIT
);
if ( rc == 0 )
{
std::cout << "zmq_recv = " << rc << std::endl;
zmq_send( responder, // _____________________ void *socket
"RECEIVED", // _____________________ void *buffer
9, // _____________________________ size_t len
0 // _____________________________ int flags
);
}
zmq_poll( pollItems,
sizeof( pollItems ),
-1 // __________________________________ block ( forever )
);// till ( if ever ) ...?
}
在这里,最有可能的是,rc == 0
但是一旦没有错过,就再也不会了
请注意,您的代码不会以任何方式检测 .recv()
调用是否也被 ZMQ_RECVMORE
标记 - 发出信号在能够 .send()
-any-answer...
.recv()
-第一条消息的所有其余多帧部分
An application that processes multi-part messages must use the
ZMQ_RCVMORE
zmq_getsockopt(3)
option after callingzmq_recv()
to determine if there are further parts to receive.
接下来,buffer
和messageSend
消息-"payloads"是一种脆弱的实体,应该重新组合(有关详细信息,最好再次阅读有关如何仔细初始化、使用和安全触摸任何 zmq_msg_t
对象的所有详细信息),在成功 .send()/.recv()
之后,低级别 API (自 2.11.x+ 起)认为它们已被丢弃,不可重复使用。另请注意,messageSend
不是 而不是 (如强制放入代码中那样)是 31-char[]
-长,是吗?这样做有什么特别的意图吗?
The
zmq_send()
function shall return number of bytes in the message if successful. Otherwise it shall return-1
and seterrno
to one of the values defined below.{ EAGAIN, ENOTSUP, EINVAL, EFSM, ETERM, ENOTSOCK, EINTR, EHOSTUNREACH }
不测试错误状态意味着对 REQ/REP
和 DEALER/ROUTER
(扩展)的实际状态一无所知(参见 EFSM
和其他潜在的问题解释器).send()/.recv()/.send()/.recv()/...
强制性 dFSA 对这些步骤的顺序
"So from what I have read, a TCP message in a
ROUTER/DEALER
pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to theROUTER
to register an incoming message."
这似乎是一个被误解的部分。应用程序端可以自由组合任意数量的单帧或多帧消息,但是 ROUTER
前置标识帧的 "trick" 是在没有用户协助的情况下执行的(消息标记是自动执行的,在任何(现在,主要是所有)多帧(d)消息被传递到应用程序端(使用接收方.recv()
-方法)之前。上面提到了对多帧消息的适当处理。