ChannelReader<T>.ReadAllAsync 在被 CancellationToken 取消时会抛出任何异常吗?

Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationToken?

当被 CancellationToken 取消时,ChannelReader<T>.ReadAllAsync 会抛出任何异常吗?它似乎没有抛出 OperationCanceledException/TaskCanceledException?

我知道如果这两个方法以即用即忘的方式调用,即 _ = SendLoopAsync(); _ = ReceiveLoopAsync();,它会导致任务崩溃而没有显示 message/exception 因为它们没有被等待,这意味着我们正在失去例外。

我不希望它在不让我知道它实际上已取消 crashed/been 的情况下使该任务崩溃,这意味着我可能应该将整个 SendLoopAsync 包装在 try/catch 中,而不是介于两者之间ReadAllAsync 的分支。

一个代表其行为的小例子将不胜感激。

var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);

var client = new ChannelWebSocket(clientWebSocket);

for (var i = 1; i <= 10; i++)
{
    client.Output.TryWrite($"Item: {i}");
}

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI

Console.ReadLine();

public class ChannelExample
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public ChannelExample(WebSocket webSocket)
    {
        _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    public ChannelReader<string> Input => _input.Reader;
    public ChannelWriter<string> Output => _output.Writer;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var receiving = ReceiveLoopAsync(cancellationToken);
        var sending = SendLoopAsync(cancellationToken);

        var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

        if (completedTask.Exception != null)
        {
            Console.WriteLine("Exception");
        }
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine($"Sending: {message}");
            await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
        }
    }

    private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
    {
        using var buffer = MemoryPool<byte>.Shared.Rent();

        while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
        {
            ValueWebSocketReceiveResult receiveResult;
            do
            {
                receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);

                if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    return;
                }
            } while (!receiveResult.EndOfMessage);
        }
    }
}

我怀疑它抛出;当然,您始终可以对其进行测试,但是 - 这是这种情况下的一般预期模式。所以你会用一个包裹它:

try
{
   // ...
}
catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
{
    // treat as completion; swallow
}

或者:您可以将 CancellationToken.None 传递到通道读取 API,然后只使用编写器的完成来表示退出(确保在退出时对编写器调用 .Complete(...) ).

也就是说:ReadAllAsync 可能不是这里的首选 API,因为你并不真的 需要 它,因为 IAsyncEnumerable<T> - 所以最好使用本地频道 API,即

while (await _output.Reader.WaitToReadAsync(cancellationToken))
{
    while (_output.Reader.TryRead(out var message))
    {
        // ...
    }
}

我不确定 StartAsync 返回的 Task 代表什么:

public async Task StartAsync(CancellationToken cancellationToken)
{
    var receiving = ReceiveLoopAsync(cancellationToken);
    var sending = SendLoopAsync(cancellationToken);

    var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

    if (completedTask.Exception != null)
    {
        Console.WriteLine("Exception");
    }
}

好像代表了receivingsending任一个任务的完成,很奇怪。这很可能是尝试记录任务异常的意外结果。有比这更好的记录任务异常的方法,最简单的方法是将异步方法中的所有代码封装在 try/catch 块中。除此之外,Exception property of a Task is not-null only when the task IsFaulted, not when it IsCanceled.