如何从不同的任务同步读取和写入 NetworkStream?

How to synchronize reading and writing to a NetworkStream from different Tasks?

我有一个 class 使用以下方法处理我的网络连接:

SendNetworkMessageAsync:此方法应在网络流上发送一个,如果设置了相应的参数,则等待响应直到发生超时。 classes NetworkMessageModelNetworkCommandModel 是助手 classes,它处理转换、消息长度等。当调用这些方法时,相应的套接字已经创建并连接到服务器。

public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
    var timeoutTask = Task.Delay(timeout);
    _ = networkStream.WriteAsync(msg.EncodedMessageHeader, 0, msg.EncodedMessageHeader.Length);
    _ = networkStream.WriteAsync(msg.EncodedMessageBody, 0, msg.EncodedMessageBody.Length);

    if (!awaitAnswer) return null;

    var taskReseiveResponse = ReceiveNetworkMessageAsync();
    if (await Task.WhenAny(taskReseiveResponse, timeoutTask) == taskReseiveResponse)
    {
        return NetworkCommandModel.FromStream(taskReseiveResponse.Result.EncodedMessageBody);
    }
    else
    {
        return new NetworkCommandModel(Util.NetworkCommandTypeEnum.COMMANDFAILED, "");
    }
}

ReceiveNetworkMessageAsyncReadAsync 应该处理所有关于从网络流接收消息的事情。

private async Task<NetworkMessageModel> ReceiveNetworkMessageAsync()
{
    var headerbytes = await ReadAsync(4);
    int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(headerbytes, 0));
    var bodybytes = await ReadAsync(bodyLength);
    return new NetworkMessageModel(bodybytes, headerbytes);
}

private async Task<byte[]> ReadAsync(int bytesToRead)
{
    var buffer = new byte[bytesToRead];
    var bytesRead = 0;

    while (bytesRead < bytesToRead)
    {
        var bytesReceived = await networkStream.ReadAsync(buffer, bytesRead, (bytesToRead - bytesRead)).ConfigureAwait(false);
        if (bytesReceived == 0)
        {
            throw new Exception("Socket Closed"); //TODO: Handle unexpected Socket closing error
        }
        bytesRead += bytesReceived;
    }
    
    return buffer;
}

这在大多数情况下工作正常。但是,如果从不同的任务同时调用这些方法,整个设置将失败。 例如:我已经实现了一个“连接检查”,这是一个任务,它会定期向服务器发送消息并在超时内等待响应。如果我从服务器收到一条长消息并且我的连接检查任务同时调用 ReceiveNetworkMessageAsync(),则响应将是两个预期答案的混合。

我可以确定对 NetworkStream 的调用是一个接一个地调用吗?

我假设 TCP/IP 协议没有可用于区分响应的 command/response 标识符。如果协议 确实 有一个标识符,那么正常的解决方案是有一个未完成命令的字典,并在收到响应时通过标识符查找相关命令。但是如果协议没有标识符,您的代码一次只能发送一个命令,直到收到响应。我假设该协议对于此答案的其余部分没有此标识符。

有几种方法可以解决这个问题。更健壮和复杂的方法是创建一个服务员队列。每次调用更高级别 API 都会将一个服务员排入该队列。 waiter会包含一个TaskCompletionSource<T>,最终用来完成高层返回的任务API。然后你会有一个单独的“运行者”任务,它只处理那个队列,先写,然后读,然后完成 TaskCompletionSource<T>.

可行,但有点复杂。

也有可能更简单的方法可以满足您的需要。您的每个更高级别的 API 调用只需要 锁定 套接字直到其响应到达。然后当套接字已经忙时,其他命令将排队。 SemaphoreSlim 是一个异步兼容的互斥原语,适用于此。该实现比构建您自己的队列更简单,但您也会失去一些不错的行为,例如 FIFO(或优先请求,如果需要的话)。

更简单的互斥方法如下所示:

private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
  await _mutex.WaitAsync();
  try
  {
    ... // existing code
  }
  finally
  {
    _mutex.Release();
  }
}

其他说明:

  • 不要丢弃任务。写入应该 awaited。至少,这可以让您看到套接字错误。
  • 发生超时时,套接字处于未知状态。它仍然是一个可行的连接吗?它是否收到了最后一个命令,所以下一个响应将针对超时命令?您的代码无法知道这些问题的答案,因此 唯一 对超时的正确响应是关闭套接字并重新连接。