使用 int NetworkStream.Read(Span<Bytes>) 接收完整的网络流

Receiving a complete network stream using int NetworkStream.Read(Span<Bytes>)

正如标题所说,我正在尝试将新的 (C# 8.0) object (Span) 用于我的网络项目。在我之前的实现中,我了解到在尝试使用其内容之前必须确保 NetworkStream 已接收到完整的缓冲区,否则,根据连接情况,另一端接收到的数据可能不完整。

while (true)
{
  while (!stream.DataAvailable)
  Thread.Sleep(10);

  int received = 0;
  byte[] response = new byte[consumerBufferSize];

  //Loop that forces the stream to read all incoming data before using it
  while (received < consumerBufferSize) 
    received += stream.Read(response, received, consumerBufferSize - received);

  string[] message = ObjectWrapper.ConvertByteArrayToObject<string>(response);
  consumerAction(this, message);
}

但是,引入了一种不同的方法来读取网络流数据 (Read(Span))。并假设 stackalloc 将有助于提高性能,我正在尝试迁移我的旧实现以适应此方法。这是它现在的样子:

while (true)
{
  while (!stream.DataAvailable)
    Thread.Sleep(10);

  Span<byte> response = stackalloc byte[consumerBufferSize];

  stream.Read(response);

  string[] message = ObjectWrapper.ConvertByteArrayToObject<string>(response).Split('|');
  consumerAction(this, message);
}

但是现在我如何确定缓冲区已被完全读取,因为它不提供像我使用的那样的方法?

编辑:

//Former methodd
int Read (byte[] buffer, int offset, int size);
//The one I am looking for
int Read (Span<byte> buffer, int offset, int size);

我不确定我是否理解您的问题。使用 Span<byte>.

时,您在第一个代码示例中依赖的所有相同功能仍然存在

Read(Span<byte>) 重载仍然 return 读取的字节数。由于 Span<byte> 不是缓冲区本身,而只是缓冲区中的 window,您可以更新 Span<byte> 值以指示读取其他数据的新起点。读取字节数并能够指定下一次读取的偏移量是复制旧示例中的功能所需的全部。当然,您目前没有任何代码可以保存原始缓冲区引用;你也需要添加它。

我希望这样的东西能正常工作:

while (true)
{
  while (!stream.DataAvailable)
    Thread.Sleep(10);

  byte* response = stackalloc byte[consumerBufferSize];

  while (received < consumerBufferSize) 
  {
    Span<byte> span = new Span<byte>(response, received, consumerBufferSize - received);

    received += stream.Read(span);
  }

  // process response here...
}

请注意,由于 stackalloc 的工作方式,这需要 unsafe 代码。您只能通过使用 Span<T> 并每次分配新块来避免这种情况。当然,那最终会吃光你所有的筹码。

由于在您的实施中您显然是在为这个无限循环指定一个线程,所以我看不出 stackalloc 有何帮助。您不妨在堆中分配一个 long-lived 缓冲区数组并使用它。

换句话说,我真的不明白这比仅将原始 Read(byte[], int, int) 重载与常规托管数组一起使用有何好处。但以上是您如何让代码工作。


旁白:您应该了解异步 API 的工作原理。由于您已经在使用 NetworkStream,因此 async/await 模式非常适合。而且无论您使用什么 API,循环检查 DataAvailable 都是垃圾。不要那样做。 Read() 方法已经是一个阻塞方法;你不需要等待数据出现在一个单独的循环中,因为 Read() 方法不会 return 直到有一些。

我只是添加一点额外的信息。

你说的函数有如下描述

public override int Read (Span<byte> buffer);

(来源:https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.networkstream.read?view=net-5.0

其中返回的 int 是从 NetworkStream 中读取的字节数。现在,如果我们查看 Span 函数,我们会发现具有以下描述的 Slice

public Span<T> Slice (int start);

(来源:https://docs.microsoft.com/en-us/dotnet/api/system.span-1.slice?view=net-5.0#system-span-1-slice(system-int32)

我们 Span 的 returns 部分,您可以使用它来将 stackalloc 的特定部分发送到您的 NetworkStream,而无需使用不安全代码。

重用您的代码,您可以使用类似这样的东西

while (true)
{
    while (!stream.DataAvailable)
        Thread.Sleep(10);

        int received = 0;
        Span<byte> response = stackalloc byte[consumerBufferSize];

        //Loop that forces the stream to read all incoming data before using it
        while (received < consumerBufferSize)
            received += stream.Read(response.Slice(received));

        string[] message = ObjectWrapper.ConvertByteArrayToObject<string>(response).Split('|');
        consumerAction(this, message);
}

简单来说,我们“创建”了一个新的 Span,它是初始 Span 的一部分,指向我们的带有 Slice 的 stackalloc,“start”参数允许我们选择从哪里开始这部分。然后将该部分传递给函数 read,它将在我们“开始”切片的任何地方开始写入我们的缓冲区。