NetworkStream ReadAsync 和 Read 同一个方法
NetworkStream ReadAsync and Read in the same method
我正在尝试构建具有持久 TCP 连接的可扩展 服务器应用程序。我使用的序列化库是同步的,转换成APM会造成很大的开销(我已经测试过了)。
数据包的格式始终是数据包 ID 的一个字节,后跟几个 header 字段和有效负载。我想知道,如果我创建了一个异步方法,例如:
public async Task<Packet> Deserialize(NetworkStream stream)
{
//Omitting parameters for the methods below for simplicity.
var id = await stream.ReadAsync();
var size = stream.Read();
//Read the rest of the fields synchronously and deserialize.
}
- 如果同步,我会冒着导致其他套接字饥饿的风险吗?
Read
其中之一占用过多(由于 TCP 碎片
例如)?
- 我想通过
ReadAsync
读取数据包的所有字节(大小是 header 中的第二个字段),然后同步反序列化它们 - 因为它是 non-blocking操作 - 但这迫使我将负载反序列化上下文与 header 的反序列化上下文分开,这将导致我编写大量重复代码。如果以上问题的答案是肯定的,还有更可行的方案吗?
Will I risk to cause starvation for other sockets
如果对于"socket",我们读"thread",答案是否定的。任务调度是自适应的;如果您的任务是调度程序确定的"long running",则将根据需要将更多工作线程投入使用。为任务提供服务的线程池经过精心设计,可以动态响应环境。
主要问题不是您会 运行 线程不足(我的意思是,您创建这么多应用程序停止响应是 可能 ,但可能性很小) 但它通过异步 I/O 将整个可伸缩性的想法抛出了 window。如果同步部分占主导地位,这可能比仅在线程上完成整个事情更糟糕。
Is there a more viable solution
总体思路是通过缓冲将反序列化与读取数据分离。读取一些字节,缓冲它们,确定您的缓冲区是否包含一个或多个完整的数据包,从缓冲区中删除这些数据包并反序列化它们(留下一些未反序列化的数据)。这要求您要么知道数据包的大小而不对其进行反序列化,要么您可以向反序列化逻辑提供一些字节,然后返回一个错误 "need more data"。除非反序列化器有副作用,否则这总是可以工作的。
The problem is that for example even just the size is not a simple
numeric value, and even its size isn't a fixed amount of bytes. I
could create an instance of the deserializer exclusively to read the
size synchronously (just about 3-4 bytes usually), then read the
payload asynchronously and then finally deserialize the latter but
that adds quite some pressure on the GC, as well as making the code
even more divided.
这里有两件事:
- 您不需要反序列化大小。只需给反序列化器一整块数据,包括大小,然后看看它吐出什么。这不起作用的唯一方法是,如果它 坚持 你交给它一个 精确 字节数(也就是说,不是 "too many").但是由于您的反序列化器是基于流的,所以我不认为这是一个问题。
- 如果您通过使用包装数组段而不是复制数据的 the appropriate constructor 直接在缓冲区上构造
MemoryStream
,GC 压力应该相当有限。现在您需要担心的是 MemoryStream
对象本身,但您通常只会创建在第 1 代中被清理的短期对象,所以不大。
大体思路是这样的:
byte[] buffer;
int offset = 0;
int bytesRead = await Stream.ReadAsync(buffer, offset, buffer.Length - offset);
int bytesRemaining = bytesRead;
while (bytesRemaining != 0 && haveCompletishPacket(buffer, offset, bytesRemaining)) {
using (var memoryStream = new MemoryStream(buffer, offset, bytesRead)) {
int size = deserializer.Deserialize(memoryStream);
// deserialize as much as possible, if you run out of data,
// just reinit the deserializer and return
// if we got here we have a packet, produce it
offset += memoryStream.Position;
bytesRemaining -= memoryStream.Position;
}
}
确保正确维护缓冲区的细节很容易出错,所以我可能在上面的代码中弄错了。尽管如此,我还是希望思路清晰。
显然,如果 haveCompletishPacket
可以在我们尝试反序列化器之前以 100% 的准确度告诉您缓冲区中是否有完整的数据包(如果您的数据包始终以恒定大小长度类型),但如果它尽力而为,它将是 "good enough",只要我们读取足够的数据并且数据包不是太大。
我正在尝试构建具有持久 TCP 连接的可扩展 服务器应用程序。我使用的序列化库是同步的,转换成APM会造成很大的开销(我已经测试过了)。
数据包的格式始终是数据包 ID 的一个字节,后跟几个 header 字段和有效负载。我想知道,如果我创建了一个异步方法,例如:
public async Task<Packet> Deserialize(NetworkStream stream)
{
//Omitting parameters for the methods below for simplicity.
var id = await stream.ReadAsync();
var size = stream.Read();
//Read the rest of the fields synchronously and deserialize.
}
- 如果同步,我会冒着导致其他套接字饥饿的风险吗?
Read
其中之一占用过多(由于 TCP 碎片 例如)? - 我想通过
ReadAsync
读取数据包的所有字节(大小是 header 中的第二个字段),然后同步反序列化它们 - 因为它是 non-blocking操作 - 但这迫使我将负载反序列化上下文与 header 的反序列化上下文分开,这将导致我编写大量重复代码。如果以上问题的答案是肯定的,还有更可行的方案吗?
Will I risk to cause starvation for other sockets
如果对于"socket",我们读"thread",答案是否定的。任务调度是自适应的;如果您的任务是调度程序确定的"long running",则将根据需要将更多工作线程投入使用。为任务提供服务的线程池经过精心设计,可以动态响应环境。
主要问题不是您会 运行 线程不足(我的意思是,您创建这么多应用程序停止响应是 可能 ,但可能性很小) 但它通过异步 I/O 将整个可伸缩性的想法抛出了 window。如果同步部分占主导地位,这可能比仅在线程上完成整个事情更糟糕。
Is there a more viable solution
总体思路是通过缓冲将反序列化与读取数据分离。读取一些字节,缓冲它们,确定您的缓冲区是否包含一个或多个完整的数据包,从缓冲区中删除这些数据包并反序列化它们(留下一些未反序列化的数据)。这要求您要么知道数据包的大小而不对其进行反序列化,要么您可以向反序列化逻辑提供一些字节,然后返回一个错误 "need more data"。除非反序列化器有副作用,否则这总是可以工作的。
The problem is that for example even just the size is not a simple numeric value, and even its size isn't a fixed amount of bytes. I could create an instance of the deserializer exclusively to read the size synchronously (just about 3-4 bytes usually), then read the payload asynchronously and then finally deserialize the latter but that adds quite some pressure on the GC, as well as making the code even more divided.
这里有两件事:
- 您不需要反序列化大小。只需给反序列化器一整块数据,包括大小,然后看看它吐出什么。这不起作用的唯一方法是,如果它 坚持 你交给它一个 精确 字节数(也就是说,不是 "too many").但是由于您的反序列化器是基于流的,所以我不认为这是一个问题。
- 如果您通过使用包装数组段而不是复制数据的 the appropriate constructor 直接在缓冲区上构造
MemoryStream
,GC 压力应该相当有限。现在您需要担心的是MemoryStream
对象本身,但您通常只会创建在第 1 代中被清理的短期对象,所以不大。
大体思路是这样的:
byte[] buffer;
int offset = 0;
int bytesRead = await Stream.ReadAsync(buffer, offset, buffer.Length - offset);
int bytesRemaining = bytesRead;
while (bytesRemaining != 0 && haveCompletishPacket(buffer, offset, bytesRemaining)) {
using (var memoryStream = new MemoryStream(buffer, offset, bytesRead)) {
int size = deserializer.Deserialize(memoryStream);
// deserialize as much as possible, if you run out of data,
// just reinit the deserializer and return
// if we got here we have a packet, produce it
offset += memoryStream.Position;
bytesRemaining -= memoryStream.Position;
}
}
确保正确维护缓冲区的细节很容易出错,所以我可能在上面的代码中弄错了。尽管如此,我还是希望思路清晰。
显然,如果 haveCompletishPacket
可以在我们尝试反序列化器之前以 100% 的准确度告诉您缓冲区中是否有完整的数据包(如果您的数据包始终以恒定大小长度类型),但如果它尽力而为,它将是 "good enough",只要我们读取足够的数据并且数据包不是太大。