以 TCP 作为传输层的 ZeroMQ PUB/SUB 模式的奇怪行为
Strange behavior of the ZeroMQ PUB/SUB pattern with TCP as transport layer
为了设计我们的API/messages,我用我们的数据做了一些初步测试:
Protobuf V3 消息:
message TcpGraphes {
uint32 flowId = 1;
repeated uint64 curTcpWinSizeUl = 2; // max 3600 elements
repeated uint64 curTcpWinSizeDl = 3; // max 3600 elements
repeated uint64 retransUl = 4; // max 3600 elements
repeated uint64 retransDl = 5; // max 3600 elements
repeated uint32 rtt = 6; // max 3600 elements
}
将消息构建为多部分消息,以便为客户端添加过滤器功能
测试了 10 个 python 客户端:5 运行 在同一台 PC(本地主机)上,5 运行 在外部 PC 上。
使用的协议是 TCP。每秒发送大约 200 条消息。
结果:
- 本地客户端正在工作:他们收到每条消息
- 远程客户端丢失了一些消息(吞吐量似乎被服务器限制为每个客户端 1Mbit/s)
服务器代码(C++):
// zeroMQ init
zmq_ctx = zmq_ctx_new();
zmq_pub_sock = zmq_socket(zmq_ctx, ZMQ_PUB);
zmq_bind(zmq_pub_sock, "tcp://*:5559");
每秒循环发送约200条消息:
std::string serStrg;
tcpG.SerializeToString(&serStrg);
// first part identifier: [flowId]tcpAnalysis.TcpGraphes
std::stringstream id;
id << It->second->first << tcpG.GetTypeName();
zmq_send(zmq_pub_sock, id.str().c_str(), id.str().length(), ZMQ_SNDMORE);
zmq_send(zmq_pub_sock, serStrg.c_str(), serStrg.length(), 0);
客户代码(python):
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://x.x.x.x:5559')
print ("Waiting for data...")
while True:
message = sub.recv() # first part (filter part, eg:"134tcpAnalysis.TcpGraphes")
print ("Got some data:",message)
message = sub.recv() # second part (protobuf bin)
我们查看了 PCAP,服务器没有使用全部可用带宽,我可以添加一些新用户,删除一些现有用户,每个远程用户获得 "only" 1Mbit/s。
我测试了两台 PC 之间的 Iperf3 TCP 连接,我达到了 60Mbit/s。
运行 python 客户端的 PC 最后有大约 30% CPU。
为了避免打印输出,我已经最小化了客户端 运行 的控制台,但它没有效果。
这是 TCP 传输层(PUB/SUB 模式)的正常行为吗?这是否意味着我应该使用 EPGM 协议?
配置:
- windows 服务器 xp
- windows 7 用于 python 远程客户端
- 使用 zmq 版本 4.0.4
出于表现的兴趣?
好吧,让我们先把资源用得更充分一点:
// //////////////////////////////////////////////////////
// zeroMQ init
// //////////////////////////////////////////////////////
zmq_ctx = zmq_ctx_new();
int aRetCODE = zmq_ctx_set( zmq_ctx, ZMQ_IO_THREADS, 10 );
assert( 0 == aRetCODE );
zmq_pub_sock = zmq_socket( zmq_ctx, ZMQ_PUB );
aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_AFFINITY, 1023 );
// ^^^^
// ||||
// (:::::::::::)-------++++
// >>> print ( "[{0: >16b}]".format( 2**10 - 1 ) ).replace( " ", "." )
// [......1111111111]
// ||||||||||
// |||||||||+---- IO-thread 0
// ||||||||+----- IO-thread 1
// |......+------ IO-thread 2
// :: : :
// |+------------ IO-thread 8
// +------------- IO-thread 9
//
// API-defined AFFINITY-mapping
具有较新 API 的非 windows 平台也可以触及调度程序细节并更好地调整 O/S-side 优先级。
联网?
好吧,让我们先把资源用得更充分一点:
aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_TOS, <_a_HIGH_PRIORITY_ToS#_> );
将整个基础架构转换为 epgm://
?
好吧,如果有人希望进行实验并获得执行该 E2E 所需的资源。
为了设计我们的API/messages,我用我们的数据做了一些初步测试:
Protobuf V3 消息:
message TcpGraphes {
uint32 flowId = 1;
repeated uint64 curTcpWinSizeUl = 2; // max 3600 elements
repeated uint64 curTcpWinSizeDl = 3; // max 3600 elements
repeated uint64 retransUl = 4; // max 3600 elements
repeated uint64 retransDl = 5; // max 3600 elements
repeated uint32 rtt = 6; // max 3600 elements
}
将消息构建为多部分消息,以便为客户端添加过滤器功能
测试了 10 个 python 客户端:5 运行 在同一台 PC(本地主机)上,5 运行 在外部 PC 上。 使用的协议是 TCP。每秒发送大约 200 条消息。
结果:
- 本地客户端正在工作:他们收到每条消息
- 远程客户端丢失了一些消息(吞吐量似乎被服务器限制为每个客户端 1Mbit/s)
服务器代码(C++):
// zeroMQ init
zmq_ctx = zmq_ctx_new();
zmq_pub_sock = zmq_socket(zmq_ctx, ZMQ_PUB);
zmq_bind(zmq_pub_sock, "tcp://*:5559");
每秒循环发送约200条消息:
std::string serStrg;
tcpG.SerializeToString(&serStrg);
// first part identifier: [flowId]tcpAnalysis.TcpGraphes
std::stringstream id;
id << It->second->first << tcpG.GetTypeName();
zmq_send(zmq_pub_sock, id.str().c_str(), id.str().length(), ZMQ_SNDMORE);
zmq_send(zmq_pub_sock, serStrg.c_str(), serStrg.length(), 0);
客户代码(python):
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://x.x.x.x:5559')
print ("Waiting for data...")
while True:
message = sub.recv() # first part (filter part, eg:"134tcpAnalysis.TcpGraphes")
print ("Got some data:",message)
message = sub.recv() # second part (protobuf bin)
我们查看了 PCAP,服务器没有使用全部可用带宽,我可以添加一些新用户,删除一些现有用户,每个远程用户获得 "only" 1Mbit/s。
我测试了两台 PC 之间的 Iperf3 TCP 连接,我达到了 60Mbit/s。
运行 python 客户端的 PC 最后有大约 30% CPU。 为了避免打印输出,我已经最小化了客户端 运行 的控制台,但它没有效果。
这是 TCP 传输层(PUB/SUB 模式)的正常行为吗?这是否意味着我应该使用 EPGM 协议?
配置:
- windows 服务器 xp
- windows 7 用于 python 远程客户端
- 使用 zmq 版本 4.0.4
出于表现的兴趣?
好吧,让我们先把资源用得更充分一点:
// //////////////////////////////////////////////////////
// zeroMQ init
// //////////////////////////////////////////////////////
zmq_ctx = zmq_ctx_new();
int aRetCODE = zmq_ctx_set( zmq_ctx, ZMQ_IO_THREADS, 10 );
assert( 0 == aRetCODE );
zmq_pub_sock = zmq_socket( zmq_ctx, ZMQ_PUB );
aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_AFFINITY, 1023 );
// ^^^^
// ||||
// (:::::::::::)-------++++
// >>> print ( "[{0: >16b}]".format( 2**10 - 1 ) ).replace( " ", "." )
// [......1111111111]
// ||||||||||
// |||||||||+---- IO-thread 0
// ||||||||+----- IO-thread 1
// |......+------ IO-thread 2
// :: : :
// |+------------ IO-thread 8
// +------------- IO-thread 9
//
// API-defined AFFINITY-mapping
具有较新 API 的非 windows 平台也可以触及调度程序细节并更好地调整 O/S-side 优先级。
联网?
好吧,让我们先把资源用得更充分一点:
aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_TOS, <_a_HIGH_PRIORITY_ToS#_> );
将整个基础架构转换为 epgm://
?
好吧,如果有人希望进行实验并获得执行该 E2E 所需的资源。