处理不同线程读取的TCP Socket数据

Process TCP Socket data read in different thread

我正在开发一个程序来与每 70 毫秒通过 TCP 通道发送数据的设备集成。

我正在使用 Socket.BeginReceive 和 Socket.EndReceive 方法来读取数据。逻辑描述如下伪代码

private void OnReceived(IAsyncResult ar)
{
    var rcvdDataLength = m_tcpSocket.EndReceive(ar);
    Array.Copy(m_tempRecvBuffer, 0, m_mainBuffer, m_mainBufferDataIndex, rcvdDataLength);
    if (CheckIfValidHeaderAndBodyReceived())
    {
        var actualData = new byte[headerLen + BodyLen];
        Array.Copy(m_mainBuffer, m_dataIndex, actualData, 0, headerLen + BodyLen);
        Process(actualData);
    }
    m_tcpSocket.BeginReceive(m_tempRecvBuffer, 0, m_tempRecvBuffer.Length, SocketFlags.None,
        OnReceived, null);
}

上面代码中描述的流程函数负责实现业务逻辑。处理函数目前大约需要 300 毫秒。

因此消费者(需要 300 毫秒)比生产者(每 70 毫秒发布一次数据)慢。我是否需要异步 运行 这个 Process 函数来避免延迟?或者 flow control TCP 层的方面负责这个?

Do I need to run this Process function asynchronously to avoid the delay?

这取决于您的申请,最终取决于您自己的优先级决定。严格来说,不,您 不需要 异步执行任何操作,但它可能是一件好事。

我通常使用和推荐的方法是专用于通过队列与应用程序的其余部分交互的界面的线程。当通信线程接收消息时,它会锁定一个队列并将它们推入。当主应用程序准备好使用该数据时,它会锁定该队列并根据需要出列。这是一种简单、健壮且可靠的机制。转到您的伪代码:

private void OnReceived(IAsyncResult ar)
{
    var rcvdDataLength = m_tcpSocket.EndReceive(ar);
    Array.Copy(m_tempRecvBuffer, 0, m_mainBuffer, m_mainBufferDataIndex, rcvdDataLength);
    if (CheckIfValidHeaderAndBodyReceived())
    {
        var actualData = new byte[headerLen + BodyLen];
        Array.Copy(m_mainBuffer, m_dataIndex, actualData, 0, headerLen + BodyLen);
        lock(messageQueue)
        {
           messageQueue.Enqueue(actualData);
        }
    }
    m_tcpSocket.BeginReceive(m_tempRecvBuffer, 0, m_tempRecvBuffer.Length, SocketFlags.None,
        OnReceived, null);
}

然后在您的应用程序中的某处:

void ProcessQueue()
{
   Queue<byte[]> tempQueue = new Queue<byte[]>();
   lock(messageQueue)
   {
      // Drain the queue so we can release the lock ASAP
      while(messageQueue.Count > 0)
      {
         tempQueue.Enqueue(messageQueue.Dequeue());
      }
   }
   while(tempQueue.Count > 0)
   {
      Process(tempQueue.Dequeue());
   }
}