如何实现良好的消息队列
How to implement good messaging queue
我需要帮助来为消息队列找到最佳且可能最快的数据结构。
架构如下:客户端从外部 C++ 服务器获取消息(无法更改其代码)。消息格式为 msg.length + data.
在旧代码中,使用 TcpClient class 进行连接,使用 MemoryStream 作为队列。 Client 从此 TCPClient 读取数据并将其复制到流中。如果在此读取客户端期间读取了整个消息,它正在另一个线程中处理,一切都很完美。如果有部分消息或 2 条消息在一起,代码会变得非常混乱。
我有一种强烈的感觉,即可以用简单的方法编写代码。令我非常困扰的是 "play" 在 MemoryStream 中有一个指针,并且需要以某种方式从中删除旧数据。
您可以使用队列class;它就像一个 FIFO,先进先出。您(至少)需要两个线程,一个线程从套接字读取消息并入队到 FIFO,另一个线程出队消息并处理它们。您还需要使用 Mutex 以防止同时访问队列。这是代码:
class MessagePacket
{
private byte[] data;
private int length;
public MessagePacket(int len, byte[] aData)
{
this.length = len;
data = new byte[len];
Array.Copy(aData, data, len);
}
public int Length()
{
return this.length;
}
public byte[] Data()
{
return this.data;
}
}
static Queue<MessagePacket> MsgQueue = new Queue<MessagePacket>();
static Mutex mutQueue = new Mutex();
/// <summary>
/// This thread read the message from the sever and put them in the queue.
/// </summary>
static void readSocket()
{
byte[] dataSize = new byte[4];
while (true/*or ApplicationIsActive*/)
{
try
{
// it's assumed that data is a 32bit integer in network byte order
if (ClientSocket.Receive(dataSize, 4, SocketFlags.None) != 4)
{
return;
}
int size = BitConverter.ToInt32(dataSize, 0);
size = IPAddress.NetworkToHostOrder(size);
byte[] buffer = new byte[size];
int offset = 0;
while (size > 0)
{
int ret = ClientSocket.Receive(buffer, offset, size, SocketFlags.None);
if (ret <= 0)
{
// Socket has been closed or there is an error, quit
return;
}
size -= ret;
offset += ret;
}
mutQueue.WaitOne();
try { MsgQueue.Enqueue(new MessagePacket(size, buffer)); }
finally { mutQueue.ReleaseMutex(); }
}
catch
{
return;
}
}
}
/// <summary>
/// This thread processes the messages in the queue.
/// </summary>
static void processMessages()
{
while (true/*or ApplicationIsActive*/)
{
if (MsgQueue.Count > 0)
{
MessagePacket msg;
mutQueue.WaitOne();
try { msg = MsgQueue.Dequeue(); }
finally { mutQueue.ReleaseMutex(); }
// Process the message: msg
}
else Thread.Sleep(50);
}
}
我需要帮助来为消息队列找到最佳且可能最快的数据结构。
架构如下:客户端从外部 C++ 服务器获取消息(无法更改其代码)。消息格式为 msg.length + data.
在旧代码中,使用 TcpClient class 进行连接,使用 MemoryStream 作为队列。 Client 从此 TCPClient 读取数据并将其复制到流中。如果在此读取客户端期间读取了整个消息,它正在另一个线程中处理,一切都很完美。如果有部分消息或 2 条消息在一起,代码会变得非常混乱。
我有一种强烈的感觉,即可以用简单的方法编写代码。令我非常困扰的是 "play" 在 MemoryStream 中有一个指针,并且需要以某种方式从中删除旧数据。
您可以使用队列class;它就像一个 FIFO,先进先出。您(至少)需要两个线程,一个线程从套接字读取消息并入队到 FIFO,另一个线程出队消息并处理它们。您还需要使用 Mutex 以防止同时访问队列。这是代码:
class MessagePacket
{
private byte[] data;
private int length;
public MessagePacket(int len, byte[] aData)
{
this.length = len;
data = new byte[len];
Array.Copy(aData, data, len);
}
public int Length()
{
return this.length;
}
public byte[] Data()
{
return this.data;
}
}
static Queue<MessagePacket> MsgQueue = new Queue<MessagePacket>();
static Mutex mutQueue = new Mutex();
/// <summary>
/// This thread read the message from the sever and put them in the queue.
/// </summary>
static void readSocket()
{
byte[] dataSize = new byte[4];
while (true/*or ApplicationIsActive*/)
{
try
{
// it's assumed that data is a 32bit integer in network byte order
if (ClientSocket.Receive(dataSize, 4, SocketFlags.None) != 4)
{
return;
}
int size = BitConverter.ToInt32(dataSize, 0);
size = IPAddress.NetworkToHostOrder(size);
byte[] buffer = new byte[size];
int offset = 0;
while (size > 0)
{
int ret = ClientSocket.Receive(buffer, offset, size, SocketFlags.None);
if (ret <= 0)
{
// Socket has been closed or there is an error, quit
return;
}
size -= ret;
offset += ret;
}
mutQueue.WaitOne();
try { MsgQueue.Enqueue(new MessagePacket(size, buffer)); }
finally { mutQueue.ReleaseMutex(); }
}
catch
{
return;
}
}
}
/// <summary>
/// This thread processes the messages in the queue.
/// </summary>
static void processMessages()
{
while (true/*or ApplicationIsActive*/)
{
if (MsgQueue.Count > 0)
{
MessagePacket msg;
mutQueue.WaitOne();
try { msg = MsgQueue.Dequeue(); }
finally { mutQueue.ReleaseMutex(); }
// Process the message: msg
}
else Thread.Sleep(50);
}
}