两个 NetworkStreams 上的 ReadAsync() - 我是否正确使用 "await"?
ReadAsync() on two NetworkStreams - Am I using "await" correctly?
我正在使用两个 ReadAsync()
调用,Task.WhenAny()
来处理两个 NetworkStreams (TcpClient)。
以下 await 代码是否会遗漏任何数据捕获?
- 代码中的问题 1:如果两个流完全同时具有新数据会怎样?
- 代码中的问题 2:WriteAsync() 是否有可能花费太长时间并丢失存储缓冲区?
- 问题 3:有没有 "better" 方法来解决这个问题
我正在编写一段代码,旨在充当 TCP 流的中间人过滤器(稍后允许 filtering/monitoring 某些数据包)
广义逻辑应该是:
- 客户端与过滤器建立连接,然后过滤器与所选服务器建立新连接
- 如果有任何数据从客户端到达,保存并发送到服务器
- 如果有任何数据从服务器到达,保存并发送给客户端
存在错误处理(列出的地方)..我错过了什么重要的事情吗?
我使用以下 answer to a question about ".Net 4.5 Async Feature for Socket Programming" 作为起点:
var read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
var read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
try
{
while (true)
{
Task<int> read_task_occurred;
try
{
read_task_occurred = await Task.WhenAny(read_task_from_client, read_task_from_server);
//Q1: What happens if both streams have new data at EXACTLY the same time?
if (read_task_occurred.Status != TaskStatus.RanToCompletion)
{
Trace.WriteLine(string.Format("[{0}] - Task failure", ID, read_task_occurred.ToString()));
break;
}
}
catch (AggregateException aex)
{
for (int i = 0; i < aex.Data.Values.Count; i++)
{
var aex_item = aex.Data[i];
Trace.WriteLine(string.Format("[{0}] - Aggregate failure {1} - {2}", ID, i, aex_item));
}
break;
}
var bytes_read = read_task_occurred.Result;
if (read_task_occurred.Result == 0)
{
// If a read-operation returns zero, the stream has closed.
OneStreamHasClosed(read_task_from_client, read_task_from_server, read_task_occurred);
break;
}
if (read_task_occurred == read_task_from_client)
{
BytesFromClient += read_task_from_client.Result;
Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
await tx_stream.WriteAsync(rx_buffer, 0, bytes_read);
await FileStream_Incoming.WriteAsync(rx_buffer, 0, bytes_read);
// Q2: Any chance of the WriteAsync taking too long?
// (e.g. rx_buffer begins to be filled again before being written to tx_stream or the filestream)
read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
}
else if (read_task_occurred == read_task_from_server)
{
BytesFromServer += read_task_from_server.Result;
Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
await rx_stream.WriteAsync(tx_buffer, 0, bytes_read);
await FileStream_Outgoing.WriteAsync(tx_buffer, 0, bytes_read);
read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
}
}
}
finally
{
FileStream_Incoming.Close();
FileStream_Outgoing.Close();
}
到目前为止,这似乎按预期工作,捕获并记录了多个流...但是,我不确定我是否正在使用 await 语句 安全地
这将稍后 运行 在多个线程中(可能每个 Incoming-Connection 一个,但这是一个单独的主题)
更新(代码Q2)
通过如下重构原始 "await tx_stream.Write..." 和 "await xxx_FileStream.Write...",我相信我已经能够 改善 Q2 的一个主要竞争条件。 . 仍然不确定这是否是 "best/recommended" 解决方案:
// Code changed to a call to MultiWrite
private void MultiWrite(byte[] buffer, int bytes_read, Stream s1, Stream s2)
{
Task writer1 = s1.WriteAsync(buffer, 0, bytes_read);
Task writer2 = s2.WriteAsync(buffer, 0, bytes_read);
Task.WaitAll(writer1, writer2);
}
更新 2(await 的代码测试)
有人告诉我 await 不允许并发任务 运行... 这让我很困惑,因为我无法理解 how/why以下可以 运行...
private async Task<char> SimpleTask(char x, int sleep_ms) { return await Task.Run(() => { Console.Write(x); Thread.Sleep(sleep_ms); return x; }); }
internal async void DoStuff()
{
var a_task = SimpleTask('a', 100);
var b_task = SimpleTask('b', 250);
var c_task = SimpleTask('c', 333);
while (true)
{
var write_task_occurred = await Task.WhenAny(a_task, b_task, c_task);
var char_written = write_task_occurred.Result;
switch (char_written)
{
case 'a': a_task = SimpleTask('a', 100); break;
case 'b': b_task = SimpleTask('b', 250); break;
case 'c': c_task = SimpleTask('c', 333); break;
}
}
}
上面的代码片段 运行(正如我所料,会产生以下多线程废话:
aabacabaacabaacbaaabcaabacaabacabaabacaabacabaacabaacbaabacaabacabaacabaabacaab
谁能解释一下where/why上面的方法是错误的,如果是这样,如何改进。
更新 3:将逻辑拆分为两种方法
我已经集成了 "write to output-stream and a file, ensure both outputs have the data in 'buffer' before further Read()
",并且根据我之前对 Q2 的更新拆分了调用 MultiWrite()
的代码:
根据@usr 和@Pekka 的建议,我将代码分为以下两种方法...
private void ProcessStreams_Good()
{
Task t1 = CopyClientToServer(), t2 = CopyServerToClient();
Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}", ID, BytesFromClient, BytesFromServer));
Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async void ProcessStreams_Broken()
{
await CopyClientToServer(); await CopyServerToClient();
Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}\r\n", ID, BytesFromClient, BytesFromServer));
Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async Task CopyClientToServer()
{
var bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
while (bytes_read > 0)
{
BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
MultiWrite(rx_buffer, bytes_read, tx_stream, FileStream_FromClient);
bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
}
}
private async Task CopyServerToClient()
{
var bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
while (bytes_read > 0)
{
BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
MultiWrite(tx_buffer, bytes_read, rx_stream, FileStream_FromServer);
bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
}
}
是的,我知道 ProcessStreams_Broken()
失败而 ProcessStreams_Good()
按预期工作的原因。
问:这个新代码稍微简洁一些,但它是"better"吗?
延迟更新(问题关闭后)
问题结束后,我遇到了一个Best Practices for async/await link,这对我很有帮助。
await
和 WhenAny
不启动任何操作。他们只是等待 运行 操作完成。您已经开始的所有读取最终都会完成,并且将从流中获取数据。不管你观察不观察结果都是这样。
我知道您想将数据从客户端中继到服务器,然后从服务器中继到客户端。那么为什么不同时启动两个异步方法,每个方法执行两个中继方向之一呢?这消除了 WhenAny
和所有复杂逻辑的需要。你需要把这个扔掉。
Q1 in code: What happens if both streams have new data at EXACTLY the same time?
您不需要该问题的答案。您必须处理您开始的所有读取的完成,无论它们何时完成。否则,您将丢失数据。也许您假设未完成的未完成读取(以某种方式)被取消并且实际上只有一个读取 "taking"?!事实并非如此。所有读取完成。无法取消(不丢弃数据)。
Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?
不确定你的意思。如果发生超时,您需要一个处理该问题的策略。通常,您会记录错误并关闭。
并发是关于非确定性的。通道的两个端点必然有不同的时钟,并且无法分辨出你先收到了哪条消息(在时钟抖动范围内)。如果您(以及整个 OS 堆栈)公平地处理收到的消息并转发它们,那么发生这种情况的顺序就无关紧要了。
如果您想避免任何偏见,请开发一种确实会向任一方向引入任何偏好的情况。例如,您的测试 Task.WhenAny(read_task_from_client, read_task_from_server);
可能偏向于其中一项任务。使用@usr 的建议创建单独的方法来避免这种情况。
最后,拆除会话时要非常小心。不可能准确地模拟所有可能的情况,以突然从用户代码中删除端点之一可能做的事情。您的仿真保真度将受到此挑战,并可能使结果无效。同样,您可能已经接受了一个流上的数据,因为另一方放弃了会话。没有办法正确地从中恢复 - 你能再做的最好的事情就是假装合作伙伴在他们看到这个之前就放弃了。
我正在使用两个 ReadAsync()
调用,Task.WhenAny()
来处理两个 NetworkStreams (TcpClient)。
以下 await 代码是否会遗漏任何数据捕获?
- 代码中的问题 1:如果两个流完全同时具有新数据会怎样?
- 代码中的问题 2:WriteAsync() 是否有可能花费太长时间并丢失存储缓冲区?
- 问题 3:有没有 "better" 方法来解决这个问题
我正在编写一段代码,旨在充当 TCP 流的中间人过滤器(稍后允许 filtering/monitoring 某些数据包)
广义逻辑应该是:
- 客户端与过滤器建立连接,然后过滤器与所选服务器建立新连接
- 如果有任何数据从客户端到达,保存并发送到服务器
- 如果有任何数据从服务器到达,保存并发送给客户端
存在错误处理(列出的地方)..我错过了什么重要的事情吗?
我使用以下 answer to a question about ".Net 4.5 Async Feature for Socket Programming" 作为起点:
var read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
var read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
try
{
while (true)
{
Task<int> read_task_occurred;
try
{
read_task_occurred = await Task.WhenAny(read_task_from_client, read_task_from_server);
//Q1: What happens if both streams have new data at EXACTLY the same time?
if (read_task_occurred.Status != TaskStatus.RanToCompletion)
{
Trace.WriteLine(string.Format("[{0}] - Task failure", ID, read_task_occurred.ToString()));
break;
}
}
catch (AggregateException aex)
{
for (int i = 0; i < aex.Data.Values.Count; i++)
{
var aex_item = aex.Data[i];
Trace.WriteLine(string.Format("[{0}] - Aggregate failure {1} - {2}", ID, i, aex_item));
}
break;
}
var bytes_read = read_task_occurred.Result;
if (read_task_occurred.Result == 0)
{
// If a read-operation returns zero, the stream has closed.
OneStreamHasClosed(read_task_from_client, read_task_from_server, read_task_occurred);
break;
}
if (read_task_occurred == read_task_from_client)
{
BytesFromClient += read_task_from_client.Result;
Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
await tx_stream.WriteAsync(rx_buffer, 0, bytes_read);
await FileStream_Incoming.WriteAsync(rx_buffer, 0, bytes_read);
// Q2: Any chance of the WriteAsync taking too long?
// (e.g. rx_buffer begins to be filled again before being written to tx_stream or the filestream)
read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
}
else if (read_task_occurred == read_task_from_server)
{
BytesFromServer += read_task_from_server.Result;
Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
await rx_stream.WriteAsync(tx_buffer, 0, bytes_read);
await FileStream_Outgoing.WriteAsync(tx_buffer, 0, bytes_read);
read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
}
}
}
finally
{
FileStream_Incoming.Close();
FileStream_Outgoing.Close();
}
到目前为止,这似乎按预期工作,捕获并记录了多个流...但是,我不确定我是否正在使用 await 语句 安全地
这将稍后 运行 在多个线程中(可能每个 Incoming-Connection 一个,但这是一个单独的主题)
更新(代码Q2)
通过如下重构原始 "await tx_stream.Write..." 和 "await xxx_FileStream.Write...",我相信我已经能够 改善 Q2 的一个主要竞争条件。 . 仍然不确定这是否是 "best/recommended" 解决方案:
// Code changed to a call to MultiWrite
private void MultiWrite(byte[] buffer, int bytes_read, Stream s1, Stream s2)
{
Task writer1 = s1.WriteAsync(buffer, 0, bytes_read);
Task writer2 = s2.WriteAsync(buffer, 0, bytes_read);
Task.WaitAll(writer1, writer2);
}
更新 2(await 的代码测试)
有人告诉我 await 不允许并发任务 运行... 这让我很困惑,因为我无法理解 how/why以下可以 运行...
private async Task<char> SimpleTask(char x, int sleep_ms) { return await Task.Run(() => { Console.Write(x); Thread.Sleep(sleep_ms); return x; }); }
internal async void DoStuff()
{
var a_task = SimpleTask('a', 100);
var b_task = SimpleTask('b', 250);
var c_task = SimpleTask('c', 333);
while (true)
{
var write_task_occurred = await Task.WhenAny(a_task, b_task, c_task);
var char_written = write_task_occurred.Result;
switch (char_written)
{
case 'a': a_task = SimpleTask('a', 100); break;
case 'b': b_task = SimpleTask('b', 250); break;
case 'c': c_task = SimpleTask('c', 333); break;
}
}
}
上面的代码片段 运行(正如我所料,会产生以下多线程废话:
aabacabaacabaacbaaabcaabacaabacabaabacaabacabaacabaacbaabacaabacabaacabaabacaab
谁能解释一下where/why上面的方法是错误的,如果是这样,如何改进。
更新 3:将逻辑拆分为两种方法
我已经集成了 "write to output-stream and a file, ensure both outputs have the data in 'buffer' before further Read()
",并且根据我之前对 Q2 的更新拆分了调用 MultiWrite()
的代码:
根据@usr 和@Pekka 的建议,我将代码分为以下两种方法...
private void ProcessStreams_Good()
{
Task t1 = CopyClientToServer(), t2 = CopyServerToClient();
Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}", ID, BytesFromClient, BytesFromServer));
Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async void ProcessStreams_Broken()
{
await CopyClientToServer(); await CopyServerToClient();
Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}\r\n", ID, BytesFromClient, BytesFromServer));
Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async Task CopyClientToServer()
{
var bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
while (bytes_read > 0)
{
BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
MultiWrite(rx_buffer, bytes_read, tx_stream, FileStream_FromClient);
bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
}
}
private async Task CopyServerToClient()
{
var bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
while (bytes_read > 0)
{
BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
MultiWrite(tx_buffer, bytes_read, rx_stream, FileStream_FromServer);
bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
}
}
是的,我知道 ProcessStreams_Broken()
失败而 ProcessStreams_Good()
按预期工作的原因。
问:这个新代码稍微简洁一些,但它是"better"吗?
延迟更新(问题关闭后)
问题结束后,我遇到了一个Best Practices for async/await link,这对我很有帮助。
await
和 WhenAny
不启动任何操作。他们只是等待 运行 操作完成。您已经开始的所有读取最终都会完成,并且将从流中获取数据。不管你观察不观察结果都是这样。
我知道您想将数据从客户端中继到服务器,然后从服务器中继到客户端。那么为什么不同时启动两个异步方法,每个方法执行两个中继方向之一呢?这消除了 WhenAny
和所有复杂逻辑的需要。你需要把这个扔掉。
Q1 in code: What happens if both streams have new data at EXACTLY the same time?
您不需要该问题的答案。您必须处理您开始的所有读取的完成,无论它们何时完成。否则,您将丢失数据。也许您假设未完成的未完成读取(以某种方式)被取消并且实际上只有一个读取 "taking"?!事实并非如此。所有读取完成。无法取消(不丢弃数据)。
Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?
不确定你的意思。如果发生超时,您需要一个处理该问题的策略。通常,您会记录错误并关闭。
并发是关于非确定性的。通道的两个端点必然有不同的时钟,并且无法分辨出你先收到了哪条消息(在时钟抖动范围内)。如果您(以及整个 OS 堆栈)公平地处理收到的消息并转发它们,那么发生这种情况的顺序就无关紧要了。
如果您想避免任何偏见,请开发一种确实会向任一方向引入任何偏好的情况。例如,您的测试 Task.WhenAny(read_task_from_client, read_task_from_server);
可能偏向于其中一项任务。使用@usr 的建议创建单独的方法来避免这种情况。
最后,拆除会话时要非常小心。不可能准确地模拟所有可能的情况,以突然从用户代码中删除端点之一可能做的事情。您的仿真保真度将受到此挑战,并可能使结果无效。同样,您可能已经接受了一个流上的数据,因为另一方放弃了会话。没有办法正确地从中恢复 - 你能再做的最好的事情就是假装合作伙伴在他们看到这个之前就放弃了。