ZMQ - 数据包丢失
ZMQ - Packet Loss
我计划使用 ZMQ 实现 Pub/Sub 模型,并考虑尝试 ZMQ 示例(天气)程序。 [注意:我对程序做了一些修改]
以下是关于ZMQ的观察:
- Post 设置服务器和 运行 程序几次,发现
有大量丢包
- 我很确定底层硬件从处理器、内存和网络带宽的角度来看已经足够好了。
甚至尝试过 TCP 调整
推荐(少数)
所以现在我想知道我是否
缺少任何优化设置或任何其他建议
应根据 ZeroMQ 性能预期实施。
请找到Pastebin的link
Publisher Sample Code:
//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <cppzmq-master/zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <unistd.h>
using namespace std::chrono;
#if (defined (WIN32))
#include <zhelpers.hpp>
#endif
#define within(num) (unsigned long) ((float) num * random () / (RAND_MAX + 1.0))
int main (int argc, char *argv[]) {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
unsigned long ticks_count = atoi(argv [1]);
//publisher.bind("ipc://weather.ipc"); // Not usable on Windows.
// Initialize random number generator
srandom ((unsigned) time (NULL));
unsigned long counter = 0;
zmq_sleep(5);
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto timestamp3 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
for(unsigned long i = 1; i <= ticks_count; i++) {
unsigned long zipcode, temperature, relhumidity;
// Get values that will fool the boss
zipcode = ++counter;
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%lu %lu %lu", zipcode, temperature, relhumidity);
publisher.send(message);
// usleep(0);
std::cout << "i, zipcode: "<< i << ", " << zipcode << std::endl;
// Code to print missing packets
if(i != zipcode)
std::cout << "Missing Packet - Expected: "<< i << ", Sent: " << zipcode << std::endl;
}
now = std::chrono::system_clock::now();
duration = now.time_since_epoch();
auto timestamp4 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
unsigned long diff = timestamp4 - timestamp3;
std::cout << "Time Taken (nanoseconds): " << diff << std::endl;
zmq_sleep(300);
return 0;
}
命令:
构建 =>
g++ pub.cpp -o pub -lstdc++ -lzmq
执行=>
./pub 22686
Subscriber Sample Code:
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <cppzmq-master/zmq.hpp>
#include <iostream>
#include <sstream>
using namespace std::chrono;
int main (int argc, char *argv[])
{
zmq::context_t context (1);
// Socket to talk to server
std::cout << "Collecting updates from weather server…\n" << std::endl;
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
const char *filter = "";//(argc > 1)? argv [1]: "10001 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
unsigned long ticks_count = atoi(argv [1]);
std::cout << "Ticks Count: " << ticks_count << std::endl;
// Process 100 updates
unsigned long update_nbr;
unsigned long total_temp = 0;
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto timestamp3 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
for (update_nbr = 1; update_nbr <= ticks_count; update_nbr++) {
zmq::message_t update;
unsigned long zipcode, temperature, relhumidity;
subscriber.recv(&update);
std::istringstream iss(static_cast<char*>(update.data()));
iss >> zipcode >> temperature >> relhumidity ;
total_temp += temperature;
std::cout << "update_nbr, zipcode: "<< update_nbr << ", " << zipcode << std::endl;
// Code to print missing packets
if(update_nbr != zipcode)
std::cout << "Missing Packet - Expected: "<< update_nbr << ", Received: " << zipcode << std::endl;
}
now = std::chrono::system_clock::now();
duration = now.time_since_epoch();
auto timestamp4 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
unsigned long diff = timestamp4 - timestamp3;
std::cout << "Time Taken (nanoseconds): " << diff << std::endl;
std::cout << "Program Complete" << std::endl;
return 0;
}
命令:
构建 =>
g++ sub.cpp -o sub -lstdc++ -lzmq
执行=>
./sub 22686
欢迎来到 Zen-of-Zero :观察到的结果已完整记录
原因:
When a ZMQ_PUB
socket enters the mute state due to having reached the high water mark for a subscriber, then any messages that would be sent to the subscriber in question shall instead be dropped until the mute state ends. The zmq_send()
function shall never block for this socket type.
解法:
要么包括适当的资源大小,
要么设计双向握手,这将自适应地节流publisher.send(message)
方法的(否则是盲目的)调用的节奏,取决于平滑和同步处理流程的能力。
PUB
-边:加
int hwm_nMSGs = 0; // A value of zero means no limit
unsigned long buf_SizeB = 100 * ticks_count;
publisher.setsockopt( ZMQ_SNDBUF, buf_SizeB, sizeof ( buf_SizeB ) );
publisher.setsockopt( ZMQ_SNDHWM, hwm_nMSGs, sizeof ( hwm_nMSGs ) );
// _________________________________________________________ CONFIG b4 .bind()
publisher.bind( "tcp://*:5556" );
...
// ___________________________________________________________________ .unbind()
zmq_unbind( publisher, "tcp://*:5556" ); // .unbind()
zmq_ctx_term( context ); // .term()
SUB
-边:加
unsigned long hwm_nMSGs = 3 * ticks_count;
unsigned long buf_SizeB = 100 * ticks_count;
subscriber.setsockopt( ZMQ_RCVHWM, hwm_nMSGs, sizeof ( hwm_nMSGs ) );
subscriber.setsockopt( ZMQ_RCVBUF, buf_SizeB, sizeof ( buf_SizeB ) );
// _________________________________________________________ CONFIG b4 .connect()
subscriber.connect( "tcp://localhost:5556" );
...
// ___________________________________________________________________ .close()
subscriber.close(); // .close()
zmq_ctx_term( context ); // .term()
最后但并非最不重要的一点,
永远不要假定网络数据包到达的自然顺序,多路径路由网络可能会传递并确实传递出-无序,所以最后一个条件:if(update_nbr != zipcode)
将标记所有无序交付,即使在 { PUB | SUB }
端由于缺少资源或在传输过程中丢失而没有消息被丢弃。
鉴于没有迹象表明您的代码正在使用或将来会使用哪个 ZeroMQ 版本/语言绑定/包装器,请注意 ZMQ_LINGER
的默认值有所不同,可能会导致您的代码挂起- 如果不是所有消息都已传输,则等待无限长的时间,但没有接收进程从队列中检索此类挂起等待的消息。避免这种情况的最佳步骤是始终明确控制所有 zmq_socket
实例的 ZMQ_LINGER
设置,并通过 .unbind()
、.close()
和 .term()
.
对于健壮和干净的 distributed-systems 设计
,这是公平和正当的做法
我计划使用 ZMQ 实现 Pub/Sub 模型,并考虑尝试 ZMQ 示例(天气)程序。 [注意:我对程序做了一些修改]
以下是关于ZMQ的观察:
- Post 设置服务器和 运行 程序几次,发现 有大量丢包
- 我很确定底层硬件从处理器、内存和网络带宽的角度来看已经足够好了。
甚至尝试过 TCP 调整 推荐(少数)
所以现在我想知道我是否 缺少任何优化设置或任何其他建议 应根据 ZeroMQ 性能预期实施。
请找到Pastebin的link
Publisher Sample Code:
//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <cppzmq-master/zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <unistd.h>
using namespace std::chrono;
#if (defined (WIN32))
#include <zhelpers.hpp>
#endif
#define within(num) (unsigned long) ((float) num * random () / (RAND_MAX + 1.0))
int main (int argc, char *argv[]) {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
unsigned long ticks_count = atoi(argv [1]);
//publisher.bind("ipc://weather.ipc"); // Not usable on Windows.
// Initialize random number generator
srandom ((unsigned) time (NULL));
unsigned long counter = 0;
zmq_sleep(5);
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto timestamp3 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
for(unsigned long i = 1; i <= ticks_count; i++) {
unsigned long zipcode, temperature, relhumidity;
// Get values that will fool the boss
zipcode = ++counter;
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%lu %lu %lu", zipcode, temperature, relhumidity);
publisher.send(message);
// usleep(0);
std::cout << "i, zipcode: "<< i << ", " << zipcode << std::endl;
// Code to print missing packets
if(i != zipcode)
std::cout << "Missing Packet - Expected: "<< i << ", Sent: " << zipcode << std::endl;
}
now = std::chrono::system_clock::now();
duration = now.time_since_epoch();
auto timestamp4 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
unsigned long diff = timestamp4 - timestamp3;
std::cout << "Time Taken (nanoseconds): " << diff << std::endl;
zmq_sleep(300);
return 0;
}
命令:
构建 =>
g++ pub.cpp -o pub -lstdc++ -lzmq
执行=>
./pub 22686
Subscriber Sample Code:
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <cppzmq-master/zmq.hpp>
#include <iostream>
#include <sstream>
using namespace std::chrono;
int main (int argc, char *argv[])
{
zmq::context_t context (1);
// Socket to talk to server
std::cout << "Collecting updates from weather server…\n" << std::endl;
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
const char *filter = "";//(argc > 1)? argv [1]: "10001 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
unsigned long ticks_count = atoi(argv [1]);
std::cout << "Ticks Count: " << ticks_count << std::endl;
// Process 100 updates
unsigned long update_nbr;
unsigned long total_temp = 0;
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto timestamp3 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
for (update_nbr = 1; update_nbr <= ticks_count; update_nbr++) {
zmq::message_t update;
unsigned long zipcode, temperature, relhumidity;
subscriber.recv(&update);
std::istringstream iss(static_cast<char*>(update.data()));
iss >> zipcode >> temperature >> relhumidity ;
total_temp += temperature;
std::cout << "update_nbr, zipcode: "<< update_nbr << ", " << zipcode << std::endl;
// Code to print missing packets
if(update_nbr != zipcode)
std::cout << "Missing Packet - Expected: "<< update_nbr << ", Received: " << zipcode << std::endl;
}
now = std::chrono::system_clock::now();
duration = now.time_since_epoch();
auto timestamp4 = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
unsigned long diff = timestamp4 - timestamp3;
std::cout << "Time Taken (nanoseconds): " << diff << std::endl;
std::cout << "Program Complete" << std::endl;
return 0;
}
命令:
构建 =>
g++ sub.cpp -o sub -lstdc++ -lzmq
执行=>
./sub 22686
欢迎来到 Zen-of-Zero :观察到的结果已完整记录
原因:
When a
ZMQ_PUB
socket enters the mute state due to having reached the high water mark for a subscriber, then any messages that would be sent to the subscriber in question shall instead be dropped until the mute state ends. Thezmq_send()
function shall never block for this socket type.
解法:
要么包括适当的资源大小,
要么设计双向握手,这将自适应地节流publisher.send(message)
方法的(否则是盲目的)调用的节奏,取决于平滑和同步处理流程的能力。
PUB
-边:加
int hwm_nMSGs = 0; // A value of zero means no limit
unsigned long buf_SizeB = 100 * ticks_count;
publisher.setsockopt( ZMQ_SNDBUF, buf_SizeB, sizeof ( buf_SizeB ) );
publisher.setsockopt( ZMQ_SNDHWM, hwm_nMSGs, sizeof ( hwm_nMSGs ) );
// _________________________________________________________ CONFIG b4 .bind()
publisher.bind( "tcp://*:5556" );
...
// ___________________________________________________________________ .unbind()
zmq_unbind( publisher, "tcp://*:5556" ); // .unbind()
zmq_ctx_term( context ); // .term()
SUB
-边:加
unsigned long hwm_nMSGs = 3 * ticks_count;
unsigned long buf_SizeB = 100 * ticks_count;
subscriber.setsockopt( ZMQ_RCVHWM, hwm_nMSGs, sizeof ( hwm_nMSGs ) );
subscriber.setsockopt( ZMQ_RCVBUF, buf_SizeB, sizeof ( buf_SizeB ) );
// _________________________________________________________ CONFIG b4 .connect()
subscriber.connect( "tcp://localhost:5556" );
...
// ___________________________________________________________________ .close()
subscriber.close(); // .close()
zmq_ctx_term( context ); // .term()
最后但并非最不重要的一点,
永远不要假定网络数据包到达的自然顺序,多路径路由网络可能会传递并确实传递出-无序,所以最后一个条件:if(update_nbr != zipcode)
将标记所有无序交付,即使在 { PUB | SUB }
端由于缺少资源或在传输过程中丢失而没有消息被丢弃。
鉴于没有迹象表明您的代码正在使用或将来会使用哪个 ZeroMQ 版本/语言绑定/包装器,请注意 ZMQ_LINGER
的默认值有所不同,可能会导致您的代码挂起- 如果不是所有消息都已传输,则等待无限长的时间,但没有接收进程从队列中检索此类挂起等待的消息。避免这种情况的最佳步骤是始终明确控制所有 zmq_socket
实例的 ZMQ_LINGER
设置,并通过 .unbind()
、.close()
和 .term()
.
对于健壮和干净的 distributed-systems 设计
,这是公平和正当的做法