使用套接字的复杂 RPC

Complex RPC using sockets

让我这样解释我的问题:

在一个系统中,我们有两台机器 Alice 和 Bob。两人之间的对话是一种交易——一个人提出请求,另一个人作出回应。任何人都可以开始对话——如果爱丽丝先问然后鲍勃回答,反之亦然。

// Alice machine (pseudocode)
bind(socket, 90909)
listen(socket)
clientSocket = accept(socket)
read(clientSocket, buffer, len(buffer)
// everything is cool? 
// Oh no read is a block call, but how can i write? I have important info for Bob

这个系统的另一个特点是 Bob 在谈话中有更高的优先级,所以如果 Alice 开始问 Bob 但没有问完,Bob 认为他有更重要的消息,那么他会打断 Alice 并先问。

// Alice machine (pseudocode)
write(socket, buffer, len(buffer))
// in the call above Bob can`t interrupt Alice during write to socket
// read response
read(socket, rBuffer, len(rBuffer))

这就是上面描述的系统。它是网络中两台计算机之间的 RPC 类系统。如何使用套接字实现它?

It's RPC kind of system between two computers in a network. How can I implement it using sockets?

您需要将通信从套接字层抽象出来,以便何时如何消息的细节是sent 不再与调用程序的 explicit/individual socket-calls 紧密耦合。特别是,您需要摆脱阻塞内部 I/O 调用的想法;相反,您需要一种机制,使您的发送和接收发生 在后台 就您的应用程序逻辑而言,以便 I/O 操作的时间不会干扰与您的应用程序的业务逻辑。

您正在寻找的是一个消息传递系统——即一个中间件层,Alice 可以将完整的消息交给它并依靠它尽快将它传递给 Bob(当然,反之亦然) .也就是说,您希望您的 Alice 程序能够使用如下代码(伪代码)向 Bob 发送消息:

 Message * msg = new Message;
 msg->setSourceAddress("Alice");
 msg->setDestinationAddress("Bob");
 msg->setContents("Hi Bob, how are you today?");
 mailbox->postMessage(msg);  // always returns immediately!

... 另一方面,您需要某种机制,通过该机制可以通知 Bob 消息已到达,供他查看。这会有点棘手,因为为了接收通知,Bob 必须有某种事件循环让他阻塞直到有消息到达让他查看(伪代码):

Message * msg;
while((msg = GetNextIncomingMessage()) != NULL)  // we'll block here until a Message is received
{
   if (msg->getSourceAddress() == "Alice")
   {
      printf("Hey, Alice sent me a letter!\n");
      printf("It says: [%s]\n", msg->getContents());
   }
   delete msg;
}

...请注意,GetNextIncomingMessage() 可以写成阻塞,直到有消息到达 Bob 来处理,或者它可以写成从不阻塞,而是 return NULL 如果没有传入消息到达,或者等待指定的最长时间等

有各种免费软件库(ZeroMQ, RabbitMQ, MUSCLE 等)可以为您在套接字之上实现这种功能,如果您只是想完成这项工作,您我们建议您只选择您最喜欢的一个并使用它,因为滚动您自己的实现是一项 non-trivial 的工作量,并且可能很难做到正确。

就是说,我会在这个答案的其余部分假设您真的想自己做,或者作为学习练习,或者因为 none 现有库适合您 -- 并且您愿意为此花费数十到数千小时。

A) 您要做的第一件事是决定如何表示消息(其中 "a Message",就我们的目的而言,意味着要发送和接收的数据块作为一个原子单元;即接收者在接收到全部内容之前无法根据其内容采取有意义的操作)。根据您的用例,"Message" 可以像 NUL-terminated ASCII 字符串一样简单,也可以像完整的 SQL 数据库或其他一些复杂的数据结构一样复杂。无论您选择什么,您都应该知道套接字唯一知道如何发送或接收的是字节序列;所以你需要编写一个 Flatten-the-Message 函数(由发件人调用),它接受一个 Message 并将其转换为等效的 sequence-of-bytes,还有一个 Unflatten-the-Message 函数(将由接收者调用)接受 sequence-of-bytes 并从中重建那些字节打算表示的消息。 (在实现这些函数时,请记住潜在的 cross-platform-compatibility 陷阱,例如不同的 endian-ness、不同的 sizeof(int/long/float/etc)、不同的编译器结构成员打包等)

B) 接下来您需要决定要使用的传输机制。对于跨网络的消息传递,您实际上只有两种选择,UDP 和 TCP; UDP 会给您带来一些 difficult-to-resolve 挑战(阻止传入 UDP 数据包的防火墙、丢弃的 UDP 数据包、接收到的数据包 out-of-order、maximum-packet-size 限制等),所以我推荐 TCP,除非你有充分的理由不能使用它。

C) 一旦你决定了,你接下来需要决定(和记录,如果只是为了你自己的理智)一个你将用于 send/receive 消息。对于 TCP,这需要包括某种方式让接收方知道给定消息的数据在哪里结束以及下一个消息的开始。 (例如,如果您的消息真的只是 NUL-terminated 文本字符串,那么接收方可以查找 NUL 字节以查看消息结束的位置;或者如果它们更复杂,构建它们的常用方法是发送number-of-bytes-in-the-Flattened-byte-sequence 首先,作为固定大小 header 的一部分,然后发送 flattened-byte-sequence 本身,并根据需要再次重复下一条消息。这样接收方就可以读取 fixed-size header 首先,它会告诉接收者接下来要读取多少字节的有效载荷数据,然后再尝试将有效载荷数据展开回消息 object)

D) 一旦确定了协议,就可以开始处理 I/O 事件循环的发送部分。通常你会想要一个 in-memory FIFO 数据结构(即 linked-list、double-ended queue 或类似的)传出消息 object;这样,当调用代码调用 postMessage() 时,您只需将调用者的 Message object 添加到 FIFO 的末尾即可queue,而不是立即尝试通过套接字发送它。这样调用者就不必等待消息被发送;相反,它可以异步发送。当然,这会带来一个问题,即您希望如何实现发送代码以与用户自己的代码并行工作(因为您不希望 I/O 操作干扰用户代码,反之亦然) .一种处理方法是在单独的线程中执行 I/O;或者,您可以使用 select()/poll()/etc 将 I/O 集成到用户的线程中,但这可能会非常具有侵入性,因此我建议尽可能不要这样做。您的 send-event-loop 可能看起来像这样(伪代码):

current_byte_sequence_to_send = NULL;
while(true)
{
   // If we have no byte sequence that we're currently sending,
   // try to pop the next Message out of the queue and convert it
   // into bytes to send
   if ((current_byte_sequence_to_send == NULL)&&(outgoing_message_queue.length() > 0))
   {
      Message * next_msg = outgoing_message_queue.pop_front();
      current_byte_sequence_to_send = FlattenMessage(nextMsg);
      delete next_msg;
   }

   if (current_byte_sequence_to_send != NULL)
   {
      send_more_bytes_from_sequence(current_byte_sequence_to_send);
      if (current_byte_sequence_to_send->numBytesSent() == current_byte_sequence_to_send->size())
      {
         delete current_byte_sequence;
         current_byte_sequence_to_send = NULL;
      }
   }
}

完成上述工作后,您将有两个并行运行的事件循环:您的常规 Alice(或 Bob)用户程序,做任何事情,加上发送-I/O事件循环的唯一工作是尽可能快地耗尽 outgoing-messages FIFO queue,方法是依次将每个消息转换为字节并通过套接字发送这些字节。

E) 之后,最后一步是接收 I/O 循环。它几乎与发送 I/O 循环相反;它从套接字中读取 byte-sequences,尽快将它们 Unflatten() 到 Message object 中,然后将 Message object 返回给用户自己的代码(或者可能只需将它们添加到 FIFO 中,供用户代码稍后提取,具体取决于您要如何处理)

这就是它的骨架;还有很多其他事情要处理(例如 thread-safety 问题,当更多数据到达时有效的 cross-thread 信号,优雅地处理网络错误,优雅地处理糟糕的网络性能,决定在什么地方阻塞等待输入是合适的如果它不是,使协议 architecture-neutral 以便它可以在任何 CPU 上工作,等等)。我花了几年的时间开发 my implementation 完全让我满意。