由于 Task.WhenAny,Polly 未处理任务中的异常

Polly doesn't handle an exception in a task because of Task.WhenAny

当我们的连接断开时,ReceiveAsync 正在抛出 WebSocketException (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)。

问题是由于某种原因,Polly 没有处理它。我相信它不会处理它,因为它在一个单独的 Task 中,尽管我正在做 Task.WhenAny.

预期的行为是在抛出 WebSocketException 时触发重新连接。

public sealed class ChannelWebSocketClient : IDisposable
{
    private readonly Uri _uri;
    private readonly ILogger<ChannelWebSocketClient> _logger;
    private readonly Channel<string> _output;
    private CancellationTokenSource? _cancellationTokenSource;

    public ChannelWebSocketClient(Uri uri, ILoggerFactory loggerFactory)
    {
        _uri = uri ?? throw new ArgumentNullException(nameof(uri));
        _logger = loggerFactory.CreateLogger<ChannelWebSocketClient>();

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true,
            SingleWriter = false
        });
    }

    public void Dispose()
    {
        _output.Writer.TryComplete();
    }

    public Task StartAsync()
    {
        return Policy.Handle<Exception>(ex => ex is not (TaskCanceledException or OperationCanceledException))
            .WaitAndRetryForeverAsync(
                (_, _) => TimeSpan.FromSeconds(5),
                (ex, retryCount, calculatedWaitDuration, _) => { _logger.LogError(ex, "Unable to connect to the web socket server. Retry count: {RetryCount} | Retry in {Seconds} seconds", retryCount, calculatedWaitDuration.TotalSeconds); })
            .ExecuteAsync(ConnectAsync);
    }

    public void Stop()
    {
        _cancellationTokenSource?.Cancel();
    }

    private async Task ConnectAsync()
    {
        _logger.LogDebug("Connecting");

        using var ws = new ClientWebSocket();

        // WebSocketException, TaskCanceledException
        await ws.ConnectAsync(_uri, CancellationToken.None).ConfigureAwait(false);

        _logger.LogDebug("Connected to {Host}", _uri.AbsoluteUri);

        _cancellationTokenSource = new CancellationTokenSource();

        var receiving = ReceiveLoopAsync(ws, _cancellationTokenSource.Token);
        var sending = SendLoopAsync(ws, _cancellationTokenSource.Token);

        var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

        if (trigger == receiving)
        {
            _cancellationTokenSource?.Cancel();

            await sending.ConfigureAwait(false);
        }

        _logger.LogDebug("END");
    }

    public async Task SendAsync(string message)
    {
        await _output.Writer.WriteAsync(message, CancellationToken.None).ConfigureAwait(false);
    }

    private async Task SendLoopAsync(WebSocket webSocket, CancellationToken cancellationToken)
    {
        _logger.LogDebug("SendLoopAsync BEGIN");

        try
        {
            while (await _output.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
            {
                while (_output.Reader.TryRead(out var message))
                {
                    // WebSocketException, TaskCanceledException, ObjectDisposedException
                    await webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes(message)),
                        WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
                }
            }
        }
        catch (OperationCanceledException)
        {
        }
        finally
        {
            _logger.LogDebug("SendLoopAsync END");
        }
    }

    private async Task ReceiveLoopAsync(WebSocket webSocket, CancellationToken cancellationToken)
    {
        _logger.LogDebug("ReceiveLoopAsync BEGIN");

        try
        {
            while (true)
            {
                ValueWebSocketReceiveResult receiveResult;

                using var buffer = MemoryPool<byte>.Shared.Rent(4096);
                await using var ms = new MemoryStream(buffer.Memory.Length);
                do
                {
                    // WebSocketException, TaskCanceledException, ObjectDisposedException
                    receiveResult = await webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);

                    if (receiveResult.MessageType == WebSocketMessageType.Close)
                    {
                        break;
                    }

                    await ms.WriteAsync(buffer.Memory[..receiveResult.Count], cancellationToken).ConfigureAwait(false);
                } while (!receiveResult.EndOfMessage);

                ms.Seek(0, SeekOrigin.Begin);

                if (receiveResult.MessageType == WebSocketMessageType.Text)
                {
                    using var reader = new StreamReader(ms, Encoding.UTF8);
                    var message = await reader.ReadToEndAsync().ConfigureAwait(false);

                    _logger.LogInformation("Message received: {Message}", message);
                }
                else if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    break;
                }
            }
        }
        catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
        {
            _logger.LogError(ex, "");
            throw;
        }
        finally
        {
            _logger.LogDebug("ReceiveLoopAsync END");
        }
    }
}

Task.WhenAnyTask.WhenAll.

  • Former throws exception 是任何因异常而失败的任务
  • 即使所有任务都失败,后者也不会抛出异常

所以要么你使用 call two 两次 .GetAwaiter().GetResult() 因为 WhenAny returns a Task<Task>

Task.WhenAny(receiving, sending).ConfigureAwait(false)
   .GetAwaiter().GetResult()
   .GetAwaiter().GetResult();

或者你可以re-throw例外

var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (trigger.Exception != null)
{
    throw trigger.Exception;
}

None 这些解决方案是完美的,但它们会触发您的政策。


更新#1

正如 Monsieur Merso 指出的,您可以调用两次 await

await await Task.WhenAny(receiving, sending).ConfigureAwait(false);

这比上面两种方法要好得多。


更新 #2

如果你想

  • 如果更快的任务失败则触发策略
  • 或者想知道哪一个完成得更快并成功

那么你就可以“避免”双重await

var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false); 

await trigger; //Throws exception if the faster Task has failed

if (trigger == receiving) //Determines which Task finished sooner
{

}