从 Http 流读取时如何取消 StreamReader 的 ReadLineAsync?
How to cancel a ReadLineAsync of StreamReader when reading from a Http stream?
我正在对一个 uri 发出 POST 请求,其中 returns 我是一个 JSON 数据流。这是在程序启动时创建的单独线程中完成的。如果重要的话,这是 Unity 项目的一部分。我的代码如下所示:
var request = new HttpRequestMessage(HttpMethod.Post, requestUri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _token.access_token);
request.Content = new StringContent(_jsonBody, Encoding.UTF8, "application/json");
HttpResponseMessage response = _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;
if (response.IsSuccessStatusCode)
{
var stream = await response.Content.ReadAsStreamAsync();
StreamReader reader = new StreamReader(stream);
while (!_stopFlag && !reader.EndOfStream)
{
string line = await reader.ReadLineAsync();
if (string.IsNullOrWhiteSpace(line)) continue;
// Do stuff with data
代码在等待数据到达然后处理它时运行良好,但是 StreamReader
class 的方法 ReadLineAsync()
将阻塞直到数据到达。每当我想使用 Thread.Join()
优雅地退出程序,或者如果我想重新启动线程以发送 POST 正文的更改版本时,这都会产生问题。我已经尝试设置一个标志来通知我的线程退出,但是如果我阻塞了 while 循环中的一行代码,这显然不起作用。
在我的研究过程中,我意识到在 ReadLineAsync()
上使用取消令牌可能很难。 SO 上的解决方案建议在我的 HttpResponseMessage
上使用某种形式的 Wait()
方法,但这不适用于 HTTP 流。
我的实现有问题吗?如何正确取消 ReadLineAsync()
?
这是许多不支持取消或超时的异步 IO 方法的已知问题。为了解决这个问题,我所做的是关闭您正在读取的流对象,这将导致异常。它不是很优雅,如果您发现自己取消了很多读取操作,它也不会很好地扩展,但它工作正常。
这里有一个非常简单的例子来说明这个概念。如果它是某种 Stream 或网络客户端,具体细节会根据您使用的底层对象而有所变化。
public void StartReceiving()
{
_receivingCts = new CancellationTokenSource();
_receivingTask = receivAsync(_receivingCts.Token);
}
// returns a task that completes when the receiving task completes
public async Task StopReceiving()
{
try
{
_receivingCts?.Cancel();
_stream?.Dispose(); //could also be a .Close() or similar
await _receivingTask;
}
catch
{
// because _stream.ReceiveAsync doesn't take a cancellation
// token, we end the receiving task by closing/disposing
// the stream while it is in use, this can generate an
// exception
}
finally
{
_stream = null;
_receivingCts = null;
_receivingTask = null;
}
}
private async Task ReceivAsync(CancellationToken token)
{
try
{
_stream?.Close();
_stream = /*create stream*/
IsReceiving = true;
while (!token.IsCancellationRequested)
{
var received = await _stream.Read****Async()
.ConfigureAwait(false);
//do stuff with received;
}
}
catch (Exception ex)
{
// if the token is cancelled, the exception is expected
if (token.IsCancellationRequested)
return;
}
finally
{
IsReceiving = false;
}
}
当 .NET 包含 tokens/timeouts 异步 IO 方法时,逻辑将大致相同。
我正在对一个 uri 发出 POST 请求,其中 returns 我是一个 JSON 数据流。这是在程序启动时创建的单独线程中完成的。如果重要的话,这是 Unity 项目的一部分。我的代码如下所示:
var request = new HttpRequestMessage(HttpMethod.Post, requestUri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _token.access_token);
request.Content = new StringContent(_jsonBody, Encoding.UTF8, "application/json");
HttpResponseMessage response = _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;
if (response.IsSuccessStatusCode)
{
var stream = await response.Content.ReadAsStreamAsync();
StreamReader reader = new StreamReader(stream);
while (!_stopFlag && !reader.EndOfStream)
{
string line = await reader.ReadLineAsync();
if (string.IsNullOrWhiteSpace(line)) continue;
// Do stuff with data
代码在等待数据到达然后处理它时运行良好,但是 StreamReader
class 的方法 ReadLineAsync()
将阻塞直到数据到达。每当我想使用 Thread.Join()
优雅地退出程序,或者如果我想重新启动线程以发送 POST 正文的更改版本时,这都会产生问题。我已经尝试设置一个标志来通知我的线程退出,但是如果我阻塞了 while 循环中的一行代码,这显然不起作用。
在我的研究过程中,我意识到在 ReadLineAsync()
上使用取消令牌可能很难。 SO 上的解决方案建议在我的 HttpResponseMessage
上使用某种形式的 Wait()
方法,但这不适用于 HTTP 流。
我的实现有问题吗?如何正确取消 ReadLineAsync()
?
这是许多不支持取消或超时的异步 IO 方法的已知问题。为了解决这个问题,我所做的是关闭您正在读取的流对象,这将导致异常。它不是很优雅,如果您发现自己取消了很多读取操作,它也不会很好地扩展,但它工作正常。
这里有一个非常简单的例子来说明这个概念。如果它是某种 Stream 或网络客户端,具体细节会根据您使用的底层对象而有所变化。
public void StartReceiving()
{
_receivingCts = new CancellationTokenSource();
_receivingTask = receivAsync(_receivingCts.Token);
}
// returns a task that completes when the receiving task completes
public async Task StopReceiving()
{
try
{
_receivingCts?.Cancel();
_stream?.Dispose(); //could also be a .Close() or similar
await _receivingTask;
}
catch
{
// because _stream.ReceiveAsync doesn't take a cancellation
// token, we end the receiving task by closing/disposing
// the stream while it is in use, this can generate an
// exception
}
finally
{
_stream = null;
_receivingCts = null;
_receivingTask = null;
}
}
private async Task ReceivAsync(CancellationToken token)
{
try
{
_stream?.Close();
_stream = /*create stream*/
IsReceiving = true;
while (!token.IsCancellationRequested)
{
var received = await _stream.Read****Async()
.ConfigureAwait(false);
//do stuff with received;
}
}
catch (Exception ex)
{
// if the token is cancelled, the exception is expected
if (token.IsCancellationRequested)
return;
}
finally
{
IsReceiving = false;
}
}
当 .NET 包含 tokens/timeouts 异步 IO 方法时,逻辑将大致相同。