从 Http 流读取时如何取消 StreamReader 的 ReadLineAsync?

How to cancel a ReadLineAsync of StreamReader when reading from a Http stream?

我正在对一个 uri 发出 POST 请求,其中 returns 我是一个 JSON 数据流。这是在程序启动时创建的单独线程中完成的。如果重要的话,这是 Unity 项目的一部分。我的代码如下所示:

var request = new HttpRequestMessage(HttpMethod.Post, requestUri);

request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _token.access_token);
request.Content = new StringContent(_jsonBody, Encoding.UTF8, "application/json");

HttpResponseMessage response = _client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;
if (response.IsSuccessStatusCode)
{
    var stream = await response.Content.ReadAsStreamAsync();
    StreamReader reader = new StreamReader(stream);

    while (!_stopFlag && !reader.EndOfStream)
    {
        string line = await reader.ReadLineAsync();
        if (string.IsNullOrWhiteSpace(line)) continue;
        // Do stuff with data

代码在等待数据到达然后处理它时运行良好,但是 StreamReader class 的方法 ReadLineAsync() 将阻塞直到数据到达。每当我想使用 Thread.Join() 优雅地退出程序,或者如果我想重新启动线程以发送 POST 正文的更改版本时,这都会产生问题。我已经尝试设置一个标志来通知我的线程退出,但是如果我阻塞了 while 循环中的一行代码,这显然不起作用。

在我的研究过程中,我意识到在 ReadLineAsync() 上使用取消令牌可能很难。 SO 上的解决方案建议在我的 HttpResponseMessage 上使用某种形式的 Wait() 方法,但这不适用于 HTTP 流。

我的实现有问题吗?如何正确取消 ReadLineAsync()?

这是许多不支持取消或超时的异步 IO 方法的已知问题。为了解决这个问题,我所做的是关闭您正在读取的流对象,这将导致异常。它不是很优雅,如果您发现自己取消了很多读取操作,它也不会很好地扩展,但它工作正常。

这里有一个非常简单的例子来说明这个概念。如果它是某种 Stream 或网络客户端,具体细节会根据您使用的底层对象而有所变化。

public void StartReceiving()
{
    _receivingCts = new CancellationTokenSource();
    _receivingTask = receivAsync(_receivingCts.Token);
}

// returns a task that completes when the receiving task completes
public async Task StopReceiving()
{
    try
    {
        _receivingCts?.Cancel();
        _stream?.Dispose(); //could also be a .Close() or similar
        await _receivingTask; 
    }
    catch
    {   
        // because _stream.ReceiveAsync doesn't take a cancellation
        // token, we end the receiving task by closing/disposing 
        // the stream while it is in use, this can generate an
        // exception
    }
    finally
    {
        _stream = null;
        _receivingCts = null;
        _receivingTask = null;
    }
}

private async Task ReceivAsync(CancellationToken token)
{
    try
    {
        _stream?.Close();
        _stream = /*create stream*/

        IsReceiving = true;
    
        while (!token.IsCancellationRequested)
        {
            var received = await _stream.Read****Async() 
                                        .ConfigureAwait(false);
            
            //do stuff with received;
        }
    }   
    catch (Exception ex)
    {   
        // if the token is cancelled, the exception is expected
        if (token.IsCancellationRequested)
            return; 
    }
    finally
    {
        IsReceiving = false;
    }
}

当 .NET 包含 tokens/timeouts 异步 IO 方法时,逻辑将大致相同。