如何从不同的任务同步读取和写入 NetworkStream?
How to synchronize reading and writing to a NetworkStream from different Tasks?
我有一个 class 使用以下方法处理我的网络连接:
SendNetworkMessageAsync
:此方法应在网络流上发送一个,如果设置了相应的参数,则等待响应直到发生超时。 classes NetworkMessageModel
和 NetworkCommandModel
是助手 classes,它处理转换、消息长度等。当调用这些方法时,相应的套接字已经创建并连接到服务器。
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
var timeoutTask = Task.Delay(timeout);
_ = networkStream.WriteAsync(msg.EncodedMessageHeader, 0, msg.EncodedMessageHeader.Length);
_ = networkStream.WriteAsync(msg.EncodedMessageBody, 0, msg.EncodedMessageBody.Length);
if (!awaitAnswer) return null;
var taskReseiveResponse = ReceiveNetworkMessageAsync();
if (await Task.WhenAny(taskReseiveResponse, timeoutTask) == taskReseiveResponse)
{
return NetworkCommandModel.FromStream(taskReseiveResponse.Result.EncodedMessageBody);
}
else
{
return new NetworkCommandModel(Util.NetworkCommandTypeEnum.COMMANDFAILED, "");
}
}
ReceiveNetworkMessageAsync
和 ReadAsync
应该处理所有关于从网络流接收消息的事情。
private async Task<NetworkMessageModel> ReceiveNetworkMessageAsync()
{
var headerbytes = await ReadAsync(4);
int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(headerbytes, 0));
var bodybytes = await ReadAsync(bodyLength);
return new NetworkMessageModel(bodybytes, headerbytes);
}
private async Task<byte[]> ReadAsync(int bytesToRead)
{
var buffer = new byte[bytesToRead];
var bytesRead = 0;
while (bytesRead < bytesToRead)
{
var bytesReceived = await networkStream.ReadAsync(buffer, bytesRead, (bytesToRead - bytesRead)).ConfigureAwait(false);
if (bytesReceived == 0)
{
throw new Exception("Socket Closed"); //TODO: Handle unexpected Socket closing error
}
bytesRead += bytesReceived;
}
return buffer;
}
这在大多数情况下工作正常。但是,如果从不同的任务同时调用这些方法,整个设置将失败。
例如:我已经实现了一个“连接检查”,这是一个任务,它会定期向服务器发送消息并在超时内等待响应。如果我从服务器收到一条长消息并且我的连接检查任务同时调用 ReceiveNetworkMessageAsync()
,则响应将是两个预期答案的混合。
我可以确定对 NetworkStream
的调用是一个接一个地调用吗?
我假设 TCP/IP 协议没有可用于区分响应的 command/response 标识符。如果协议 确实 有一个标识符,那么正常的解决方案是有一个未完成命令的字典,并在收到响应时通过标识符查找相关命令。但是如果协议没有标识符,您的代码一次只能发送一个命令,直到收到响应。我假设该协议对于此答案的其余部分没有此标识符。
有几种方法可以解决这个问题。更健壮和复杂的方法是创建一个服务员队列。每次调用更高级别 API 都会将一个服务员排入该队列。 waiter会包含一个TaskCompletionSource<T>
,最终用来完成高层返回的任务API。然后你会有一个单独的“运行者”任务,它只处理那个队列,先写,然后读,然后完成 TaskCompletionSource<T>
.
可行,但有点复杂。
也有可能更简单的方法可以满足您的需要。您的每个更高级别的 API 调用只需要 锁定 套接字直到其响应到达。然后当套接字已经忙时,其他命令将排队。 SemaphoreSlim
是一个异步兼容的互斥原语,适用于此。该实现比构建您自己的队列更简单,但您也会失去一些不错的行为,例如 FIFO(或优先请求,如果需要的话)。
更简单的互斥方法如下所示:
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
await _mutex.WaitAsync();
try
{
... // existing code
}
finally
{
_mutex.Release();
}
}
其他说明:
- 不要丢弃任务。写入应该
await
ed。至少,这可以让您看到套接字错误。
- 发生超时时,套接字处于未知状态。它仍然是一个可行的连接吗?它是否收到了最后一个命令,所以下一个响应将针对超时命令?您的代码无法知道这些问题的答案,因此 唯一 对超时的正确响应是关闭套接字并重新连接。
我有一个 class 使用以下方法处理我的网络连接:
SendNetworkMessageAsync
:此方法应在网络流上发送一个,如果设置了相应的参数,则等待响应直到发生超时。 classes NetworkMessageModel
和 NetworkCommandModel
是助手 classes,它处理转换、消息长度等。当调用这些方法时,相应的套接字已经创建并连接到服务器。
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
var timeoutTask = Task.Delay(timeout);
_ = networkStream.WriteAsync(msg.EncodedMessageHeader, 0, msg.EncodedMessageHeader.Length);
_ = networkStream.WriteAsync(msg.EncodedMessageBody, 0, msg.EncodedMessageBody.Length);
if (!awaitAnswer) return null;
var taskReseiveResponse = ReceiveNetworkMessageAsync();
if (await Task.WhenAny(taskReseiveResponse, timeoutTask) == taskReseiveResponse)
{
return NetworkCommandModel.FromStream(taskReseiveResponse.Result.EncodedMessageBody);
}
else
{
return new NetworkCommandModel(Util.NetworkCommandTypeEnum.COMMANDFAILED, "");
}
}
ReceiveNetworkMessageAsync
和 ReadAsync
应该处理所有关于从网络流接收消息的事情。
private async Task<NetworkMessageModel> ReceiveNetworkMessageAsync()
{
var headerbytes = await ReadAsync(4);
int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(headerbytes, 0));
var bodybytes = await ReadAsync(bodyLength);
return new NetworkMessageModel(bodybytes, headerbytes);
}
private async Task<byte[]> ReadAsync(int bytesToRead)
{
var buffer = new byte[bytesToRead];
var bytesRead = 0;
while (bytesRead < bytesToRead)
{
var bytesReceived = await networkStream.ReadAsync(buffer, bytesRead, (bytesToRead - bytesRead)).ConfigureAwait(false);
if (bytesReceived == 0)
{
throw new Exception("Socket Closed"); //TODO: Handle unexpected Socket closing error
}
bytesRead += bytesReceived;
}
return buffer;
}
这在大多数情况下工作正常。但是,如果从不同的任务同时调用这些方法,整个设置将失败。
例如:我已经实现了一个“连接检查”,这是一个任务,它会定期向服务器发送消息并在超时内等待响应。如果我从服务器收到一条长消息并且我的连接检查任务同时调用 ReceiveNetworkMessageAsync()
,则响应将是两个预期答案的混合。
我可以确定对 NetworkStream
的调用是一个接一个地调用吗?
我假设 TCP/IP 协议没有可用于区分响应的 command/response 标识符。如果协议 确实 有一个标识符,那么正常的解决方案是有一个未完成命令的字典,并在收到响应时通过标识符查找相关命令。但是如果协议没有标识符,您的代码一次只能发送一个命令,直到收到响应。我假设该协议对于此答案的其余部分没有此标识符。
有几种方法可以解决这个问题。更健壮和复杂的方法是创建一个服务员队列。每次调用更高级别 API 都会将一个服务员排入该队列。 waiter会包含一个TaskCompletionSource<T>
,最终用来完成高层返回的任务API。然后你会有一个单独的“运行者”任务,它只处理那个队列,先写,然后读,然后完成 TaskCompletionSource<T>
.
可行,但有点复杂。
也有可能更简单的方法可以满足您的需要。您的每个更高级别的 API 调用只需要 锁定 套接字直到其响应到达。然后当套接字已经忙时,其他命令将排队。 SemaphoreSlim
是一个异步兼容的互斥原语,适用于此。该实现比构建您自己的队列更简单,但您也会失去一些不错的行为,例如 FIFO(或优先请求,如果需要的话)。
更简单的互斥方法如下所示:
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
await _mutex.WaitAsync();
try
{
... // existing code
}
finally
{
_mutex.Release();
}
}
其他说明:
- 不要丢弃任务。写入应该
await
ed。至少,这可以让您看到套接字错误。 - 发生超时时,套接字处于未知状态。它仍然是一个可行的连接吗?它是否收到了最后一个命令,所以下一个响应将针对超时命令?您的代码无法知道这些问题的答案,因此 唯一 对超时的正确响应是关闭套接字并重新连接。