ChannelReader<T>.ReadAllAsync 在被 CancellationToken 取消时会抛出任何异常吗?
Does ChannelReader<T>.ReadAllAsync throw any exceptions when being canceled by a CancellationToken?
当被 CancellationToken 取消时,ChannelReader<T>.ReadAllAsync
会抛出任何异常吗?它似乎没有抛出 OperationCanceledException/TaskCanceledException?
我知道如果这两个方法以即用即忘的方式调用,即 _ = SendLoopAsync(); _ = ReceiveLoopAsync();
,它会导致任务崩溃而没有显示 message/exception 因为它们没有被等待,这意味着我们正在失去例外。
我不希望它在不让我知道它实际上已取消 crashed/been 的情况下使该任务崩溃,这意味着我可能应该将整个 SendLoopAsync 包装在 try/catch 中,而不是介于两者之间ReadAllAsync 的分支。
一个代表其行为的小例子将不胜感激。
var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);
var client = new ChannelWebSocket(clientWebSocket);
for (var i = 1; i <= 10; i++)
{
client.Output.TryWrite($"Item: {i}");
}
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI
Console.ReadLine();
public class ChannelExample
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public ChannelExample(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
public ChannelReader<string> Input => _input.Reader;
public ChannelWriter<string> Output => _output.Writer;
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine($"Sending: {message}");
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
{
ValueWebSocketReceiveResult receiveResult;
do
{
receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
}
} while (!receiveResult.EndOfMessage);
}
}
}
我怀疑它会抛出;当然,您始终可以对其进行测试,但是 - 这是这种情况下的一般预期模式。所以你会用一个包裹它:
try
{
// ...
}
catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
{
// treat as completion; swallow
}
或者:您可以将 CancellationToken.None
传递到通道读取 API,然后只使用编写器的完成来表示退出(确保在退出时对编写器调用 .Complete(...)
).
也就是说:ReadAllAsync
可能不是这里的首选 API,因为你并不真的 需要 它,因为 IAsyncEnumerable<T>
- 所以最好使用本地频道 API,即
while (await _output.Reader.WaitToReadAsync(cancellationToken))
{
while (_output.Reader.TryRead(out var message))
{
// ...
}
}
我不确定 StartAsync
返回的 Task
代表什么:
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
好像代表了receiving
和sending
任一个任务的完成,很奇怪。这很可能是尝试记录任务异常的意外结果。有比这更好的记录任务异常的方法,最简单的方法是将异步方法中的所有代码封装在 try
/catch
块中。除此之外,Exception
property of a Task
is not-null only when the task IsFaulted
, not when it IsCanceled
.
当被 CancellationToken 取消时,ChannelReader<T>.ReadAllAsync
会抛出任何异常吗?它似乎没有抛出 OperationCanceledException/TaskCanceledException?
我知道如果这两个方法以即用即忘的方式调用,即 _ = SendLoopAsync(); _ = ReceiveLoopAsync();
,它会导致任务崩溃而没有显示 message/exception 因为它们没有被等待,这意味着我们正在失去例外。
我不希望它在不让我知道它实际上已取消 crashed/been 的情况下使该任务崩溃,这意味着我可能应该将整个 SendLoopAsync 包装在 try/catch 中,而不是介于两者之间ReadAllAsync 的分支。
一个代表其行为的小例子将不胜感激。
var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);
var client = new ChannelWebSocket(clientWebSocket);
for (var i = 1; i <= 10; i++)
{
client.Output.TryWrite($"Item: {i}");
}
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI
Console.ReadLine();
public class ChannelExample
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public ChannelExample(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
public ChannelReader<string> Input => _input.Reader;
public ChannelWriter<string> Output => _output.Writer;
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine($"Sending: {message}");
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
{
ValueWebSocketReceiveResult receiveResult;
do
{
receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
}
} while (!receiveResult.EndOfMessage);
}
}
}
我怀疑它会抛出;当然,您始终可以对其进行测试,但是 - 这是这种情况下的一般预期模式。所以你会用一个包裹它:
try
{
// ...
}
catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
{
// treat as completion; swallow
}
或者:您可以将 CancellationToken.None
传递到通道读取 API,然后只使用编写器的完成来表示退出(确保在退出时对编写器调用 .Complete(...)
).
也就是说:ReadAllAsync
可能不是这里的首选 API,因为你并不真的 需要 它,因为 IAsyncEnumerable<T>
- 所以最好使用本地频道 API,即
while (await _output.Reader.WaitToReadAsync(cancellationToken))
{
while (_output.Reader.TryRead(out var message))
{
// ...
}
}
我不确定 StartAsync
返回的 Task
代表什么:
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
好像代表了receiving
和sending
任一个任务的完成,很奇怪。这很可能是尝试记录任务异常的意外结果。有比这更好的记录任务异常的方法,最简单的方法是将异步方法中的所有代码封装在 try
/catch
块中。除此之外,Exception
property of a Task
is not-null only when the task IsFaulted
, not when it IsCanceled
.