以 TCP 作为传输层的 ZeroMQ PUB/SUB 模式的奇怪行为

Strange behavior of the ZeroMQ PUB/SUB pattern with TCP as transport layer

为了设计我们的API/messages,我用我们的数据做了一些初步测试:

Protobuf V3 消息:

message TcpGraphes {
    uint32 flowId                   = 1;
    repeated uint64 curTcpWinSizeUl = 2; // max 3600 elements
    repeated uint64 curTcpWinSizeDl = 3; // max 3600 elements
    repeated uint64 retransUl       = 4; // max 3600 elements
    repeated uint64 retransDl       = 5; // max 3600 elements
    repeated uint32 rtt             = 6; // max 3600 elements
}

将消息构建为多部分消息,以便为客户端添加过滤器功能

测试了 10 个 python 客户端:5 运行 在同一台 PC(本地主机)上,5 运行 在外部 PC 上。 使用的协议是 TCP。每秒发送大约 200 条消息

结果:

  1. 本地客户端正在工作:他们收到每条消息
  2. 远程客户端丢失了一些消息(吞吐量似乎被服务器限制为每个客户端 1Mbit/s)

服务器代码(C++):

// zeroMQ init
zmq_ctx = zmq_ctx_new();
zmq_pub_sock = zmq_socket(zmq_ctx, ZMQ_PUB);
zmq_bind(zmq_pub_sock, "tcp://*:5559");

每秒循环发送约200条消息:

std::string serStrg;
tcpG.SerializeToString(&serStrg);
// first part identifier: [flowId]tcpAnalysis.TcpGraphes
std::stringstream id;
id << It->second->first << tcpG.GetTypeName();
zmq_send(zmq_pub_sock, id.str().c_str(), id.str().length(), ZMQ_SNDMORE);
zmq_send(zmq_pub_sock, serStrg.c_str(), serStrg.length(), 0);

客户代码(python):

ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://x.x.x.x:5559')
print ("Waiting for data...")
while True:
    message = sub.recv() # first part (filter part, eg:"134tcpAnalysis.TcpGraphes")
    print ("Got some data:",message)
    message = sub.recv() # second part (protobuf bin)

我们查看了 PCAP,服务器没有使用全部可用带宽,我可以添加一些新用户,删除一些现有用户,每个远程用户获得 "only" 1Mbit/s。

我测试了两台 PC 之间的 Iperf3 TCP 连接,我达到了 60Mbit/s。

运行 python 客户端的 PC 最后有大约 30% CPU。 为了避免打印输出,我已经最小化了客户端 运行 的控制台,但它没有效果。

这是 TCP 传输层(PUB/SUB 模式)的正常行为吗?这是否意味着我应该使用 EPGM 协议?

配置:

出于表现的兴趣?

好吧,让我们先把资源用得更充分一点:

// //////////////////////////////////////////////////////
// zeroMQ init
// //////////////////////////////////////////////////////

zmq_ctx = zmq_ctx_new();

int aRetCODE = zmq_ctx_set( zmq_ctx, ZMQ_IO_THREADS, 10 );

assert( 0 == aRetCODE );

zmq_pub_sock = zmq_socket(  zmq_ctx, ZMQ_PUB );

    aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_AFFINITY, 1023 );
    //                                                     ^^^^
    //                                                     ||||
    //                                 (:::::::::::)-------++++
    // >>> print ( "[{0: >16b}]".format( 2**10 - 1 ) ).replace( " ", "." )
    // [......1111111111]
    //        ||||||||||
    //        |||||||||+---- IO-thread 0
    //        ||||||||+----- IO-thread 1
    //        |......+------ IO-thread 2
    //        ::             :         :
    //        |+------------ IO-thread 8
    //        +------------- IO-thread 9
    //
    // API-defined AFFINITY-mapping

具有较新 API 的非 windows 平台也可以触及调度程序细节并更好地调整 O/S-side 优先级。


联网?

好吧,让我们先把资源用得更充分一点:

    aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_TOS, <_a_HIGH_PRIORITY_ToS#_> );

将整个基础架构转换为 epgm:// ?

好吧,如果有人希望进行实验并获得执行该 E2E 所需的资源。