如何实现良好的消息队列

How to implement good messaging queue

我需要帮助来为消息队列找到最佳且可能最快的数据结构。

架构如下:客户端从外部 C++ 服务器获取消息(无法更改其代码)。消息格式为 msg.length + data.

在旧代码中,使用 TcpClient class 进行连接,使用 MemoryStream 作为队列。 Client 从此 TCPClient 读取数据并将其复制到流中。如果在此读取客户端期间读取了整个消息,它正在另一个线程中处理,一切都很完美。如果有部分消息或 2 条消息在一起,代码会变得非常混乱。

我有一种强烈的感觉,即可以用简单的方法编写代码。令我非常困扰的是 "play" 在 MemoryStream 中有一个指针,并且需要以某种方式从中删除旧数据。

您可以使用队列class;它就像一个 FIFO,先进先出。您(至少)需要两个线程,一个线程从套接字读取消息并入队到 FIFO,另一个线程出队消息并处理它们。您还需要使用 Mutex 以防止同时访问队列。这是代码:

    class MessagePacket
    {
        private byte[] data;
        private int length;

        public MessagePacket(int len, byte[] aData)
        {
            this.length = len;
            data = new byte[len];
            Array.Copy(aData, data, len);
        }
        public int Length()
        {
            return this.length;
        }
        public byte[] Data()
        {
            return this.data;
        }
    }

    static Queue<MessagePacket> MsgQueue = new Queue<MessagePacket>();
    static Mutex mutQueue = new Mutex();

    /// <summary> 
    ///     This thread read the message from the sever and put them in the queue.
    /// </summary>
    static void readSocket()
    {
        byte[] dataSize = new byte[4];
        while (true/*or ApplicationIsActive*/)
        {
            try
            {
                // it's assumed that data is a 32bit integer in network byte order
                if (ClientSocket.Receive(dataSize, 4, SocketFlags.None) != 4)
                {
                    return;
                }
                int size = BitConverter.ToInt32(dataSize, 0);
                size = IPAddress.NetworkToHostOrder(size);

                byte[] buffer = new byte[size];
                int offset = 0;
                while (size > 0)
                {
                    int ret = ClientSocket.Receive(buffer, offset, size, SocketFlags.None);
                    if (ret <= 0)
                    {
                        // Socket has been closed or there is an error, quit
                        return;
                    }
                    size -= ret;
                    offset += ret;
                }
                mutQueue.WaitOne();
                try { MsgQueue.Enqueue(new MessagePacket(size, buffer)); }
                finally { mutQueue.ReleaseMutex(); }
            }
            catch  
            {
                return;
            }
        }
    }

    /// <summary> 
    ///     This thread processes the messages in the queue.
    /// </summary>
    static void processMessages()
    {
        while (true/*or ApplicationIsActive*/)
        {
            if (MsgQueue.Count > 0)
            {
                MessagePacket msg;
                mutQueue.WaitOne();
                try { msg = MsgQueue.Dequeue(); }
                finally { mutQueue.ReleaseMutex(); }
                // Process the message: msg
            }
            else Thread.Sleep(50);
        }
    }