ZeroMQ:如何使用多个发布者和一个客户端,使用 C < C11
ZeroMQ: how to use multiple Publishers and a single Client, using C < C11
我是 ZeroMQ 新手。
我有多个发布者和一个客户。寻求建议以最佳方式实施它。
目前,它对单个客户端和服务器使用回复 - 请求模式;这必须扩展到多个发布者和一个订阅者。
此应用程序将在不支持 C11 的 QNX 系统上 运行,因此 zmq::multipart_t
无济于事。
void TransportLayer::Init()
{
socket.bind( "tcp://*:5555" );
}
void TransportLayer::Receive()
{
while ( true ) {
zmq::message_t request;
string protoBuf;
socket.recv( &request );
uint16_t id = *( (uint16_t*)request.data() );
protoBuf = std::string( static_cast<char*>( request.data()
+ sizeof( uint16_t )
),
request.size() - sizeof( uint16_t )
);
InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf );
}
Send();
usleep( 1 );
}
void TransportLayer::Send()
{
zmq::message_t reply( 1 );
memcpy( reply.data(), "#", 1 );
socket.send( reply );
}
这是我写的代码,最初设计为只听一个客户端,现在我必须扩展它以听多个客户端。
我尝试使用 zmq::multipart_t
但这需要 C11 支持,但我们使用的 QNX 版本不支持 C11。
我尝试实施建议的解决方案。
我创建了 2 个连接到相同静态位置的发布者。
观察:
我)
执行顺序:
1. 启动订阅者
2。启动 Publisher1(它只发布了一个数据值)
订户错过了接收此数据。
II )
修改 Publisher1 以在 while 循环中发送相同的数据
执行顺序:
1. 启动订阅者
2。已启动 Publisher1
3。已启动 Publsiher2。
现在我看到订阅者正在接收来自两个发布者的数据。
这表明可能会丢失数据。
如何保证数据绝对不丢失?
这是我的源代码:
出版商 2:
dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {
}
void dummyFrontEnd::Init()
{
socket.connect("tcp://127.0.0.1:5555");
cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData() {
while ( std::getline(file, line_str) ) {
std::stringstream ss(line_str);
std::string direction;
double tdiff;
int i, _1939, pgn, priority, source, length, data[8];
char J, p, _0, dash, d;
ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source
>> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2]
>> data[3] >> data[4] >> data[5] >> data[6] >> data[7];
timestamp += tdiff;
while ( gcl_get_time_ms() - start_time <
uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); }
if (arguments.verbose) {
std::cout << timestamp << " " << i << " " << J << " " << _1939 << " "
<< pgn << " " << p << " " << priority << " " << _0 << " " << source
<< " " << dash << " " << direction << " " << d << " " << length
<< " " << data[0] << " " << data[1] << " " << data[2] << " "
<< data[3] << " " << data[4] << " " << data[5] << " " << data[6]
<< " " << data[7] << std::endl;
}
uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0);
protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */
protoTable.add_columnvalues(intToString(pgn)); /* PGN */
protoTable.add_columnvalues(intToString(priority)); /* Priority */
protoTable.add_columnvalues(intToString(source)); /* Source */
protoTable.add_columnvalues(direction); /* Direction */
protoTable.add_columnvalues(intToString(length)); /* Length */
protoTable.add_columnvalues(intToString(data[0])); /* data1 */
protoTable.add_columnvalues(intToString(data[1])); /* data2 */
protoTable.add_columnvalues(intToString(data[2])); /* data3 */
protoTable.add_columnvalues(intToString(data[3])); /* data4 */
protoTable.add_columnvalues(intToString(data[4])); /* data5 */
protoTable.add_columnvalues(intToString(data[5])); /* data6 */
protoTable.add_columnvalues(intToString(data[6])); /* data7 */
protoTable.add_columnvalues(intToString(data[7])); /* data8 */
zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t));
*((uint16_t*)create_values.data()) = TABLEMSG_ID; // ID
protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong());
socket.send(create_values);
protoTable.clear_columnvalues();
usleep(1);
}
}
发布者 1:
dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {
}
void dummyFrontEnd::Init()
{
socket.connect("tcp://127.0.0.1:5555");
cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData()
{
cout << "In SendData" << endl;
while(1) {
canlogreq canLogObj = canlogreq::default_instance();
canLogObj.set_fromhours(11);
canLogObj.set_fromminutes(7);
canLogObj.set_fromseconds(2);
canLogObj.set_fromday(16);
canLogObj.set_frommonth(5);
canLogObj.set_fromyear(2020);
canLogObj.set_tohours(12);
canLogObj.set_tominutes(7);
canLogObj.set_toseconds(4);
canLogObj.set_today(17);
canLogObj.set_tomonth(5);
canLogObj.set_toyear(2020);
zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t));
*((uint16_t*)logsnippetmsg.data()) = 20;
canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong());
socket.send(logsnippetmsg);
usleep(1);
canLogObj.clear_fromhours();
canLogObj.clear_fromminutes();
canLogObj.clear_fromseconds();
canLogObj.clear_fromday();
canLogObj.clear_frommonth();
canLogObj.clear_fromyear();
canLogObj.clear_tohours();
canLogObj.clear_tominutes();
canLogObj.clear_toseconds();
canLogObj.clear_today();
canLogObj.clear_tomonth();
canLogObj.clear_toyear();
}
}
订户:
TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ }
void TransportLayer::Init()
{
socket.bind("tcp://*:5555");
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
void TransportLayer::Receive()
{
cout << "TransportLayer::Receive " << " I am in server " << endl;
static int count = 1;
// Producer thread.
while ( true ){
zmq::message_t request;
string protoBuf;
socket.recv(&request);
uint16_t id = *((uint16_t*)request.data());
cout << "TransportLayer : " << "request.data: " << request.data() << endl;
cout << "TransportLayer : count " << count << endl; count = count + 1;
cout << "TransportLayer : request.data.size " << request.size() << endl;
protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t));
cout << "ProtoBuf : " << protoBuf << endl;
InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance();
switch(id) {
case TABLEMSG_ID: cout << "Canlyser" << endl;
interfaceLayObj->ParseProtoBufTable(protoBuf);
break;
case LOGSNIPPET_ID: cout << "LogSnip" << endl;
interfaceLayObj->ParseProtoBufLogSnippet(protoBuf);
interfaceLayObj->logsnippetSignal(); // publish the signal
break;
default: break;
}
usleep(1);
}
}
Q : "how to use multiple Publishers and a single Client, using C < C11?"
因此,QNX 版本没有明确说明,所以让我们进行一般性工作。
如 所述,单个客户端(属于 SUB
-原型)可能 zmq_connect( ? )
,但是以管理一些我不知道的方式为代价,如何让所有其他的,当前的以及任何未来的 PUB
-s zmq_bind()
,然后让 SUB 知道去哪里 zmq_connect( ? )
,以便从新绑定的 PUB
-peer 那里得到一些消息。
因此,让单个 SUB
-agent 执行 zmq_bind()
并让任何当前或未来的 PUB
-s 执行 [=20] 将是一种更聪明的方法=] 当他们来的时候,被引导到单一的、静态的、已知的 SUB
的位置(这并不是说,他们不能使用任何可用的传输-类 - 一个 inproc://
,另一个tcp://
,一些ipc://
,如果QNX允许&系统体系结构需要这样做(并且,显然,假设 SUB
-agent 已经公开了一个正确配置的 AccessNode 来接收此类连接)。
接下来,您的 SUB
-Client 必须配置其订阅过滤主题列表:可能是“接收所有内容!”的命令。 :
...
retCode = zmq_setsockopt( <aSubSocketINSTANCE>, ZMQ_SUBSCRIBE, "", 0 );
assert( retCode == 0 && "FAILED: at ZMQ_SUBSCRIBE order " );
...
鉴于此有效,您的下一个职责是使设置足够稳健(显式 ZMQ_LINGER
设置为 0、访问策略、安全性、扩展资源、L2/L3-network 保护措施等).
您已经完成了对 ZeroMQ 的利用,正好适合您的 QNX 系统设计需求。
我是 ZeroMQ 新手。
我有多个发布者和一个客户。寻求建议以最佳方式实施它。
目前,它对单个客户端和服务器使用回复 - 请求模式;这必须扩展到多个发布者和一个订阅者。
此应用程序将在不支持 C11 的 QNX 系统上 运行,因此 zmq::multipart_t
无济于事。
void TransportLayer::Init()
{
socket.bind( "tcp://*:5555" );
}
void TransportLayer::Receive()
{
while ( true ) {
zmq::message_t request;
string protoBuf;
socket.recv( &request );
uint16_t id = *( (uint16_t*)request.data() );
protoBuf = std::string( static_cast<char*>( request.data()
+ sizeof( uint16_t )
),
request.size() - sizeof( uint16_t )
);
InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf );
}
Send();
usleep( 1 );
}
void TransportLayer::Send()
{
zmq::message_t reply( 1 );
memcpy( reply.data(), "#", 1 );
socket.send( reply );
}
这是我写的代码,最初设计为只听一个客户端,现在我必须扩展它以听多个客户端。
我尝试使用 zmq::multipart_t
但这需要 C11 支持,但我们使用的 QNX 版本不支持 C11。
我尝试实施建议的解决方案。
我创建了 2 个连接到相同静态位置的发布者。
观察:
我)
执行顺序:
1. 启动订阅者
2。启动 Publisher1(它只发布了一个数据值)
订户错过了接收此数据。
II )
修改 Publisher1 以在 while 循环中发送相同的数据
执行顺序:
1. 启动订阅者
2。已启动 Publisher1
3。已启动 Publsiher2。
现在我看到订阅者正在接收来自两个发布者的数据。
这表明可能会丢失数据。
如何保证数据绝对不丢失?
这是我的源代码:
出版商 2:
dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {
}
void dummyFrontEnd::Init()
{
socket.connect("tcp://127.0.0.1:5555");
cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData() {
while ( std::getline(file, line_str) ) {
std::stringstream ss(line_str);
std::string direction;
double tdiff;
int i, _1939, pgn, priority, source, length, data[8];
char J, p, _0, dash, d;
ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source
>> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2]
>> data[3] >> data[4] >> data[5] >> data[6] >> data[7];
timestamp += tdiff;
while ( gcl_get_time_ms() - start_time <
uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); }
if (arguments.verbose) {
std::cout << timestamp << " " << i << " " << J << " " << _1939 << " "
<< pgn << " " << p << " " << priority << " " << _0 << " " << source
<< " " << dash << " " << direction << " " << d << " " << length
<< " " << data[0] << " " << data[1] << " " << data[2] << " "
<< data[3] << " " << data[4] << " " << data[5] << " " << data[6]
<< " " << data[7] << std::endl;
}
uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0);
protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */
protoTable.add_columnvalues(intToString(pgn)); /* PGN */
protoTable.add_columnvalues(intToString(priority)); /* Priority */
protoTable.add_columnvalues(intToString(source)); /* Source */
protoTable.add_columnvalues(direction); /* Direction */
protoTable.add_columnvalues(intToString(length)); /* Length */
protoTable.add_columnvalues(intToString(data[0])); /* data1 */
protoTable.add_columnvalues(intToString(data[1])); /* data2 */
protoTable.add_columnvalues(intToString(data[2])); /* data3 */
protoTable.add_columnvalues(intToString(data[3])); /* data4 */
protoTable.add_columnvalues(intToString(data[4])); /* data5 */
protoTable.add_columnvalues(intToString(data[5])); /* data6 */
protoTable.add_columnvalues(intToString(data[6])); /* data7 */
protoTable.add_columnvalues(intToString(data[7])); /* data8 */
zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t));
*((uint16_t*)create_values.data()) = TABLEMSG_ID; // ID
protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong());
socket.send(create_values);
protoTable.clear_columnvalues();
usleep(1);
}
}
发布者 1:
dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {
}
void dummyFrontEnd::Init()
{
socket.connect("tcp://127.0.0.1:5555");
cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData()
{
cout << "In SendData" << endl;
while(1) {
canlogreq canLogObj = canlogreq::default_instance();
canLogObj.set_fromhours(11);
canLogObj.set_fromminutes(7);
canLogObj.set_fromseconds(2);
canLogObj.set_fromday(16);
canLogObj.set_frommonth(5);
canLogObj.set_fromyear(2020);
canLogObj.set_tohours(12);
canLogObj.set_tominutes(7);
canLogObj.set_toseconds(4);
canLogObj.set_today(17);
canLogObj.set_tomonth(5);
canLogObj.set_toyear(2020);
zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t));
*((uint16_t*)logsnippetmsg.data()) = 20;
canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong());
socket.send(logsnippetmsg);
usleep(1);
canLogObj.clear_fromhours();
canLogObj.clear_fromminutes();
canLogObj.clear_fromseconds();
canLogObj.clear_fromday();
canLogObj.clear_frommonth();
canLogObj.clear_fromyear();
canLogObj.clear_tohours();
canLogObj.clear_tominutes();
canLogObj.clear_toseconds();
canLogObj.clear_today();
canLogObj.clear_tomonth();
canLogObj.clear_toyear();
}
}
订户:
TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ }
void TransportLayer::Init()
{
socket.bind("tcp://*:5555");
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
void TransportLayer::Receive()
{
cout << "TransportLayer::Receive " << " I am in server " << endl;
static int count = 1;
// Producer thread.
while ( true ){
zmq::message_t request;
string protoBuf;
socket.recv(&request);
uint16_t id = *((uint16_t*)request.data());
cout << "TransportLayer : " << "request.data: " << request.data() << endl;
cout << "TransportLayer : count " << count << endl; count = count + 1;
cout << "TransportLayer : request.data.size " << request.size() << endl;
protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t));
cout << "ProtoBuf : " << protoBuf << endl;
InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance();
switch(id) {
case TABLEMSG_ID: cout << "Canlyser" << endl;
interfaceLayObj->ParseProtoBufTable(protoBuf);
break;
case LOGSNIPPET_ID: cout << "LogSnip" << endl;
interfaceLayObj->ParseProtoBufLogSnippet(protoBuf);
interfaceLayObj->logsnippetSignal(); // publish the signal
break;
default: break;
}
usleep(1);
}
}
Q : "how to use multiple Publishers and a single Client, using C < C11?"
因此,QNX 版本没有明确说明,所以让我们进行一般性工作。
如 SUB
-原型)可能 zmq_connect( ? )
,但是以管理一些我不知道的方式为代价,如何让所有其他的,当前的以及任何未来的 PUB
-s zmq_bind()
,然后让 SUB 知道去哪里 zmq_connect( ? )
,以便从新绑定的 PUB
-peer 那里得到一些消息。
因此,让单个 SUB
-agent 执行 zmq_bind()
并让任何当前或未来的 PUB
-s 执行 [=20] 将是一种更聪明的方法=] 当他们来的时候,被引导到单一的、静态的、已知的 SUB
的位置(这并不是说,他们不能使用任何可用的传输-类 - 一个 inproc://
,另一个tcp://
,一些ipc://
,如果QNX允许&系统体系结构需要这样做(并且,显然,假设 SUB
-agent 已经公开了一个正确配置的 AccessNode 来接收此类连接)。
接下来,您的 SUB
-Client 必须配置其订阅过滤主题列表:可能是“接收所有内容!”的命令。 :
...
retCode = zmq_setsockopt( <aSubSocketINSTANCE>, ZMQ_SUBSCRIBE, "", 0 );
assert( retCode == 0 && "FAILED: at ZMQ_SUBSCRIBE order " );
...
鉴于此有效,您的下一个职责是使设置足够稳健(显式 ZMQ_LINGER
设置为 0、访问策略、安全性、扩展资源、L2/L3-network 保护措施等).
您已经完成了对 ZeroMQ 的利用,正好适合您的 QNX 系统设计需求。