ZMQ Multi-part flusher:你能在不读取所有部分的情况下获得通过 ZeroMQ 接收的多部分消息中的总部分数吗?

ZMQ Multi-part flusher: Can you get the total number of parts in a multi-part message received via ZeroMQ without reading them all?

我正在使用多部分消息传递在 C 中使用 ZeroMQ 实现一个简单的 REQ-REP 模式。除了少数例外,我的大多数消息都严格分为 4 个部分(来回)。为了执行该规则,我需要确定收到的多部分消息的总部分数。知道它是否 <= 4 很容易。这是我的接收函数:

#define BUFMAX  64 // Maximum size of text buffers
#define BUFRCV  63 // Maximum reception size of text buffers (reserve 1 space to add a terminal '[=10=]')

char mpartstr[4][BUFMAX];


int recv_multi(void *socket,int *aremore)

// Receive upto the first 4 parts of a multipart message into mpartstr[][].
// Returns the number of parts read (upto 4) or <0 if there is an error.
// Returns -1 if there is an error with a zmq function.
// It sets aremore=1 if there are still more parts to read after the fourth
// part (or aremore=0 if not).
{
 int len,rc,rcvmore,pdx,wrongpard=0;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=0;
 len=zmq_recv(socket, mpartstr[pdx], BUFRCV, 0);
 if(len==-1) return len;

 mpartstr[pdx][len]='[=10=]';
 rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;

 pdx++;
 if(rcvmore==0){*aremore=0; return pdx;}

 while(rcvmore){
   len=zmq_recv (socket, mpartstr[pdx], BUFRCV, 0); if(len==-1) return len; mpartstr[pdx][len]='[=10=]';
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx==4) break;
 }

 *aremore=rcvmore;
 return pdx;
}

一切顺利。但是现在在我的 main() 函数中,我通过查看 aremore 的值来检查是否还有更多部分。在那些我不期望更多的情况下,我会向发件人发送一条错误消息,但我发现如果我不阅读多部分消息的所有部分(它读取剩余的部分),ZeroMQ 不喜欢它下次我调用 zmq_recv() 函数时,这个旧的多部分消息的一部分,即使在我发送消息并期望新的干净的多部分响应之后也是如此。

所以我真正需要的是一种 "flush" 函数来清除包含超过 4 个我想丢弃的部分的消息的剩余部分。到目前为止,我必须这样做的唯一方法是像这样一个丑陋的任意蛮力耗尽函数(aremore 的开始值为 1 - 它由前一个函数设置):

int recv_exhaust(void *socket,int *aremore)

// Receive the remainder of a multipart message and discard the contents.
// Use this to clean out a multi-part 'inbox' from a wrongly sent message.
// Returns 0 on success
// Returns -1 on zmq function failure
// Returns -2 on failure to exhaust even after 1000 parts.
{                                                          
 int len,rc,rcvmore,pdx;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=1;
 rcvmore=*aremore;

 while(rcvmore){
   len=zmq_recv(socket, mpartstr[0], BUFRCV, 0); if(len==-1) return len;
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx>1000) return -2;
 }

 return 0;
}

如果没有专门的 'flusher' API 那么至少我可以摆脱我任意 1000 条消息的限制,如果我有一些方法可以提前知道有多少部分(总共)给定的多部分消息有。 ZeroMQ 肯定知道这一点,因为多部分消息是作为一个整体块发送的。谁能指出我找到该信息的方法?或者那里有合适的 'flusher' function/method 吗? (对于标准 C,请不要使用 C++/C# 等)。提前致谢。

Q : Can anyone point me to the way to find that info?

是的。

Q : is there a proper 'flusher' function/method out there?

是和否:

就 ZeroMQ v2.x 到 v4.3.1,没有明确的 API-调用 "flusher"

ZeroMQ 设计提供的低延迟智能消息传递的美丽和强大是建立在精心设计的零禅基础上的:总是更喜欢性能而不是舒适 - 作为零复制、零保修和其他范例建议。

天真(我忍受了很多痛苦,将其简化为诉诸于使用原始阻塞 recv() ...)"flusher" 必须一直走到 ZMQ_RCVMORE 不再对 "beyond" 多帧最后消息的部分进行 NACK 标记(或者 zmq_msg_more() == 0 确实符合相同的要求)。尽管如此,所有这些操作都只是一个指针处理,没有数据从 RAM 中获取 "moved/copied/read",只是指针被分配,所以它确实既快又 I/O-efficient :

int    more;
size_t more_size = sizeof ( more );
do {
      zmq_msg_t part;                       /* Create an empty ØMQ message to hold the message part */
      int rc = zmq_msg_init (&part);           assert (rc == 0 && "MSG_INIT failed" );
      rc = zmq_msg_recv (&part, socket, 0); /* Block until a message is available to be received from socket */
                                               assert (rc != -1 && "MSG_RECV failed" );
                                            /* Determine if more message parts are to follow */
      rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
                                               assert (rc == 0 && "GETSOCKOPT failed" );
      zmq_msg_close (&part);
      } while (more);

鉴于 RFC-23/ZMTP 记录的属性,只有少数(线级遥测编码)保证:

1) 所有消息得到 sent/delivered:

  • 原子地(所有帧都无错误二进制相同,或者根本none)
  • 最多一次(每个相关同行)
  • 顺序

2) 多部分消息另外获得内部(带内)遥测 "advice" 状态

  • 有点标记 状态 { 1: more-frames-follow| 0: no-more-frames }
  • 有点标记 大小类型 { 0: 8b-direct-octet | 1: 64b-"network-endian"-coded }
  • 一个尺寸-建议{ 0~255: direct-size | 0~2^63-1: 64b-"network-endian"-coded-size }

已记录 zmq_recv() API 在这方面同样相当明确:

Multi-part messages

A ØMQ message is composed of 1 or more message parts. Each message part is an independent zmq_msg_t in its own right. ØMQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory.

An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_msg_recv() to determine if there are further parts to receive.


无论 "ugly" 这在第一次阅读时看起来如何,适合内存的最坏情况是多部分消息框架中有大量的小消息。

到 "get-rid-of-'em" 的结果时间当然不为零,但是紧凑高效的内部 ZMTP 遥测和低延迟流处理的好处是更重要的目标(并且已经实现)。

如有疑问,请先对最坏情况进行基准测试:

a) "produce" 大约 1E9 个多部分消息帧,传输零大小的有效载荷(没有数据,但所有消息框架)

b) "setup" 尽可能简单 "topology" PUSH/PULL

c) "select" transport-class 你的选择 { inproc:// | ipc:// | tipc:// | ... | vmci:// } - 最好的堆栈-less inproc://(我会用这个开始压力测试)

d) 秒表这样的盲目机械零捷径 "flusher" 在 ReferencePoint-S: 之间 zmq_poll( ZMQ_POLLIN ) 已经 POSACK 存在任何可读内容和 ReferencePoint-E: 当来自多部分多部分消息的最后一个被盲人-"flusher"-马戏团循环时。


结果解释:

[S][E] 之间花费的那几纳秒确实算作最坏情况的证据-案例 的时间 "scapegoated" 变成了一个故意盲目的 -"flusher" - 循环马戏团 。在现实世界的用例中,还有其他原因可能会花费更多时间做同样的事情。

然而,不要忘记发送的责任这样{明知如此大小|格式不正确的}-multi-frame-BEAST(s) 是根本原因在其他极端情况下处理此问题的任何操作风险-低延迟高-(几乎线性)-以可扩展性为重点 messaging/signaling框架。

这是零之禅的艺术,让这一切成为可能。感谢 Pieter HINTJENS 和他的团队,由 Martin SÚSTRIK 领导,我们都欠他们很多,因为他们能够继续使用他们的遗产。