这种方法是否比在 Task.Run 中触发 stream.Read() 更好?
Is this approach better than just firing stream.Read() in a Task.Run?
编辑:我最终没有按照下面的 with Stephen Cleary. If you are interested in how I did it differently, take a look at my 使用这种方法。
我正在寻找一种从 NetworkStream
异步读取超时的方法。当然,问题是无法取消 NetworkStream
上的 ReadAsync()
,因为它只是忽略了 CancellationToken
。我读到一个建议在令牌取消时关闭流的答案,但在我的情况下,这不是一个选项,因为 Tcp 连接必须保持打开状态。所以我想出了以下代码,但我想知道这是否比
更好
Task.Run(() => stream.Read(buffer, offset, count)
并且只是阻塞了一个线程。
public static class TcpStreamExtension
{
public static async Task<int> ReadAsyncWithTimeout(this NetworkStream stream, byte[] buffer, int offset, int count)
{
CancellationTokenSource cts = new CancellationTokenSource();
bool keepTrying = true;
Timer timer = new Timer(stream.ReadTimeout);
timer.Elapsed += new ElapsedEventHandler((sender, args) => stopTrying(sender, args, cts, out keepTrying));
timer.Start();
try
{
if (stream.CanRead)
{
while (true)
{
if (stream.DataAvailable)
{
return await stream.ReadAsync(buffer, offset, count, cts.Token).ConfigureAwait(false);
}
if (keepTrying)
{
await Task.Delay(300, cts.Token).ConfigureAwait(false);
}
else
{
cts.Dispose();
timer.Dispose();
throw new IOException();
}
}
}
}
catch (TaskCanceledException tce)
{
// do nothing
}
finally
{
cts.Dispose();
timer.Dispose();
}
if (stream.DataAvailable)
{
return await stream.ReadAsync(buffer, offset, count).ConfigureAwait(false);
}
throw new IOException();
}
private static void stopTrying(object sender, ElapsedEventArgs args, CancellationTokenSource cts, out bool keepTrying)
{
keepTrying = false;
cts.Cancel();
}
}
该应用程序可能必须能够与数千个端点进行通信,我希望以一种不会阻塞一堆线程的方式创建它,因为它所做的大部分工作都是 IO。另外,超时的情况应该是
对于类似的用例,我使用了 Task.Delay()
任务来超时。
看起来像这样:
public static async Task<int> ReadAsync(
NetworkStream stream, byte[] buffer, int offset, int count, int timeoutMillis)
{
if (timeoutMillis < 0) throw new ArgumentException(nameof(timeoutMillis));
else if (timeoutMillis == 0)
{
// No timeout
return await stream.ReadAsync(buffer, offset, count);
}
var cts = new CancellationTokenSource();
var readTask = stream.ReadAsync(buffer, offset, count, cts.Token);
var timerTask = Task.Delay(timeoutMillis, cts.Token);
var finishedTask = await Task.WhenAny(readTask, timerTask);
var hasTimeout = ReferenceEquals(timerTask, finishedTask);
// Cancel the timer which might be still running
cts.Cancel();
cts.Dispose();
if (hasTimeout) throw new TimeoutException();
// No timeout occured
return readTask.Result;
}
首先,您尝试做的事情存在根本性缺陷。您应该一直从开放的 TCP/IP 流 读取 - 一旦一次读取获得一些数据,就将其传递并开始下一次读取。
因此,我的第一个建议是 不需要 首先需要可取消读取。相反,请始终阅读。同样,使用 DataAvailable
是一种代码味道。
更多解释...
对于不可取消的代码,"enforcing" 没有很好的取消方法。关闭 TCP/IP 套接字是最简单和最干净的方法。您现有的解决方案将不起作用,因为 ReadAsync
忽略了 CancellationToken
。所以它并不比只使用 CancellationToken
没有计时器更好。如果 ReadAsync
忽略 CancellationToken
,您唯一真正的选择是关闭套接字。任何其他解决方案都可能导致 "lost data" - 从套接字读取但被丢弃的数据。
根据 with Stephen Cleary 和他的建议,我重新审视了如何实现它,我采用了一种每次读取都不会超时但只要 TcpClient 就保持打开状态的方法打开然后控制来自不同代码的超时。我使用 Task.Run(() => beginReading());
,这样当然会使用池中的线程,但我认为这没关系,因为大多数时候该线程会触及 await
,因此是空闲的
这是我的实现:
private readonly Queue<byte> bigBuffer = new Queue<byte>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0, 1);
// This is called in a Task.Run()
private async Task beginReading()
{
byte[] buffer = new byte[1024];
using (_shutdownToken.Register(() => m_TcpStream.Close()))
{
while (!_shutdownToken.IsCancellationRequested)
{
try
{
int bytesReceived = 0;
if (m_TcpStream.CanRead)
{
bytesReceived = await m_TcpStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
}
else
{
// in case the stream is not working, wait a little bit
await Task.Delay(3000, _shutdownToken);
}
if (bytesReceived > 0)
{
for (int i = 0; i < bytesReceived; i++)
{
bigBuffer.Enqueue(buffer[i]);
}
_signal.Release();
Array.Clear(buffer, 0, buffer.Length);
}
}
catch (Exception e)
{
LoggingService.Log(e);
}
}
}
}
private async Task<int> ReadAsyncWithTimeout(byte[] buffer, int offset, int count)
{
int bytesToBeRead = 0;
if (!m_TcpClient.Connected)
{
throw new ObjectDisposedException("Socket is not connected");
}
if (bigBuffer.Count > 0)
{
bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;
for (int i = offset; i < bytesToBeRead; i++)
{
buffer[i] = bigBuffer.Dequeue();
}
// Clear up the semaphore in case of a race condition where the writer just wrote and then this came in and read it without waiting
if (_signal.CurrentCount > 0)
await _signal.WaitAsync(BabelfishConst.TCPIP_READ_TIME_OUT_IN_MS, _shutdownToken).ConfigureAwait(false);
return bytesToBeRead;
}
// In case there is nothing in the Q, wait up to timeout to get data from the writer
await _signal.WaitAsync(15000, _shutdownToken).ConfigureAwait(false);
// read again in case the semaphore was signaled by an Enqueue
if (bigBuffer.Count > 0)
{
bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;
for (int i = offset; i < bytesToBeRead; i++)
{
buffer[i] = bigBuffer.Dequeue();
}
return bytesToBeRead;
}
// This is because the synchronous NetworkStream Read() method throws this exception when it times out
throw new IOException();
}
编辑:我最终没有按照下面的
我正在寻找一种从 NetworkStream
异步读取超时的方法。当然,问题是无法取消 NetworkStream
上的 ReadAsync()
,因为它只是忽略了 CancellationToken
。我读到一个建议在令牌取消时关闭流的答案,但在我的情况下,这不是一个选项,因为 Tcp 连接必须保持打开状态。所以我想出了以下代码,但我想知道这是否比
Task.Run(() => stream.Read(buffer, offset, count)
并且只是阻塞了一个线程。
public static class TcpStreamExtension
{
public static async Task<int> ReadAsyncWithTimeout(this NetworkStream stream, byte[] buffer, int offset, int count)
{
CancellationTokenSource cts = new CancellationTokenSource();
bool keepTrying = true;
Timer timer = new Timer(stream.ReadTimeout);
timer.Elapsed += new ElapsedEventHandler((sender, args) => stopTrying(sender, args, cts, out keepTrying));
timer.Start();
try
{
if (stream.CanRead)
{
while (true)
{
if (stream.DataAvailable)
{
return await stream.ReadAsync(buffer, offset, count, cts.Token).ConfigureAwait(false);
}
if (keepTrying)
{
await Task.Delay(300, cts.Token).ConfigureAwait(false);
}
else
{
cts.Dispose();
timer.Dispose();
throw new IOException();
}
}
}
}
catch (TaskCanceledException tce)
{
// do nothing
}
finally
{
cts.Dispose();
timer.Dispose();
}
if (stream.DataAvailable)
{
return await stream.ReadAsync(buffer, offset, count).ConfigureAwait(false);
}
throw new IOException();
}
private static void stopTrying(object sender, ElapsedEventArgs args, CancellationTokenSource cts, out bool keepTrying)
{
keepTrying = false;
cts.Cancel();
}
}
该应用程序可能必须能够与数千个端点进行通信,我希望以一种不会阻塞一堆线程的方式创建它,因为它所做的大部分工作都是 IO。另外,超时的情况应该是
对于类似的用例,我使用了 Task.Delay()
任务来超时。
看起来像这样:
public static async Task<int> ReadAsync(
NetworkStream stream, byte[] buffer, int offset, int count, int timeoutMillis)
{
if (timeoutMillis < 0) throw new ArgumentException(nameof(timeoutMillis));
else if (timeoutMillis == 0)
{
// No timeout
return await stream.ReadAsync(buffer, offset, count);
}
var cts = new CancellationTokenSource();
var readTask = stream.ReadAsync(buffer, offset, count, cts.Token);
var timerTask = Task.Delay(timeoutMillis, cts.Token);
var finishedTask = await Task.WhenAny(readTask, timerTask);
var hasTimeout = ReferenceEquals(timerTask, finishedTask);
// Cancel the timer which might be still running
cts.Cancel();
cts.Dispose();
if (hasTimeout) throw new TimeoutException();
// No timeout occured
return readTask.Result;
}
首先,您尝试做的事情存在根本性缺陷。您应该一直从开放的 TCP/IP 流 读取 - 一旦一次读取获得一些数据,就将其传递并开始下一次读取。
因此,我的第一个建议是 不需要 首先需要可取消读取。相反,请始终阅读。同样,使用 DataAvailable
是一种代码味道。
更多解释...
对于不可取消的代码,"enforcing" 没有很好的取消方法。关闭 TCP/IP 套接字是最简单和最干净的方法。您现有的解决方案将不起作用,因为 ReadAsync
忽略了 CancellationToken
。所以它并不比只使用 CancellationToken
没有计时器更好。如果 ReadAsync
忽略 CancellationToken
,您唯一真正的选择是关闭套接字。任何其他解决方案都可能导致 "lost data" - 从套接字读取但被丢弃的数据。
根据 Task.Run(() => beginReading());
,这样当然会使用池中的线程,但我认为这没关系,因为大多数时候该线程会触及 await
,因此是空闲的
这是我的实现:
private readonly Queue<byte> bigBuffer = new Queue<byte>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0, 1);
// This is called in a Task.Run()
private async Task beginReading()
{
byte[] buffer = new byte[1024];
using (_shutdownToken.Register(() => m_TcpStream.Close()))
{
while (!_shutdownToken.IsCancellationRequested)
{
try
{
int bytesReceived = 0;
if (m_TcpStream.CanRead)
{
bytesReceived = await m_TcpStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
}
else
{
// in case the stream is not working, wait a little bit
await Task.Delay(3000, _shutdownToken);
}
if (bytesReceived > 0)
{
for (int i = 0; i < bytesReceived; i++)
{
bigBuffer.Enqueue(buffer[i]);
}
_signal.Release();
Array.Clear(buffer, 0, buffer.Length);
}
}
catch (Exception e)
{
LoggingService.Log(e);
}
}
}
}
private async Task<int> ReadAsyncWithTimeout(byte[] buffer, int offset, int count)
{
int bytesToBeRead = 0;
if (!m_TcpClient.Connected)
{
throw new ObjectDisposedException("Socket is not connected");
}
if (bigBuffer.Count > 0)
{
bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;
for (int i = offset; i < bytesToBeRead; i++)
{
buffer[i] = bigBuffer.Dequeue();
}
// Clear up the semaphore in case of a race condition where the writer just wrote and then this came in and read it without waiting
if (_signal.CurrentCount > 0)
await _signal.WaitAsync(BabelfishConst.TCPIP_READ_TIME_OUT_IN_MS, _shutdownToken).ConfigureAwait(false);
return bytesToBeRead;
}
// In case there is nothing in the Q, wait up to timeout to get data from the writer
await _signal.WaitAsync(15000, _shutdownToken).ConfigureAwait(false);
// read again in case the semaphore was signaled by an Enqueue
if (bigBuffer.Count > 0)
{
bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;
for (int i = offset; i < bytesToBeRead; i++)
{
buffer[i] = bigBuffer.Dequeue();
}
return bytesToBeRead;
}
// This is because the synchronous NetworkStream Read() method throws this exception when it times out
throw new IOException();
}