使用 Polly 进行重连和超时

Using Polly to do reconnects and timeouts

我在尝试完成以下操作时遇到问题:

简化,我想让这个 class 有弹性,这意味着与其尝试连接到死的 web 套接字服务器 30 秒但没有成功,不管是什么原因,它应该快速失败 - > 在 10 秒内重试 -> 快速失败 -> 再试一次,依此类推。这个等待和重试逻辑应该重复,直到我们调用StopAsync或释放实例。

您可以在 GitHub 上找到 WebSocketDuplexPipe class。

public sealed class Client : IDisposable
{
    private const int RetrySeconds = 10;
    private readonly WebSocketDuplexPipe _webSocketPipe;
    private readonly string _url;

    public Client(string url)
    {
        _url = url;
        _webSocketPipe = new WebSocketDuplexPipe();
    }

    public Task StartAsync(CancellationToken cancellationToken = default)
    {
        var retryPolicy = Policy
            .Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
            .WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
                (exception, calculatedWaitDuration) =>
                {
                    Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
                });

        return retryPolicy.ExecuteAsync(async () =>
        {
            await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
            _ = ReceiveLoop();
        });
    }

    public Task StopAsync()
    {
        return _webSocketPipe.StopAsync();
    }

    public async Task SendAsync(string data, CancellationToken cancellationToken = default)
    {
        var encoded = Encoding.UTF8.GetBytes(data);
        var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
        await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
    }

    private async Task ReceiveLoop()
    {
        var input = _webSocketPipe.Input;

        try
        {
            while (true)
            {
                var result = await input.ReadAsync().ConfigureAwait(false);
                var buffer = result.Buffer;

                try
                {
                    if (result.IsCanceled)
                    {
                        break;
                    }

                    if (!buffer.IsEmpty)
                    {
                        while (MessageParser.TryParse(ref buffer, out var payload))
                        {
                            var message = Encoding.UTF8.GetString(payload);

                            _messageReceivedSubject.OnNext(message);
                        }
                    }

                    if (result.IsCompleted)
                    {
                        break;
                    }
                }
                finally
                {
                    input.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Disconnected: {ex.Message}");
        }
    }
}

让我通过评论在回答中捕捉我们谈话的本质。

ReceiveLoop 重试

在您等待 input.ReadAsync 完成时,您的重试策略将从 ExecuteAsync 成功退出。原因是你不是在等待 ReceiveLoop,而是你只是以火烧火燎的方式开始它。

换句话说,您的重试逻辑将仅适用于 StartAsyncReceiveLoop.

中等待之前的代码。

修复方法是将重试逻辑移至 ReceiveLoop

超时

Polly 的超时策略可以使用乐观或悲观策略。 The former one 严重依赖 CancellationToken.

  • 因此,如果您将 CancellationToken.None 传递给 ExecuteAsync,那么您基本上就是说让 TimeoutPolicy 处理取消过程。
  • 如果你传递一个已经存在的令牌,那么装饰任务可以是 由 TimeoutPolicy 或提供的令牌取消。

请记住它会抛出 TimeoutRejectedException 而不是 OperationCanceledException

onTimeoutAsync

TimeoutAsync 有几个重载可以接受两个 onTimeoutAsync 代表之一

Func<Context, TimeSpan, Task, Task> onTimeoutAsync

Func<Context, TimeSpan, Task, Exception, Task> onTimeoutAsync

如果您有在 TimeoutRejectedException.

上触发的外部策略(例如重试),那么记录超时已发生的事实可能很有用

链接策略

我建议使用 Policy.WrapAsync 静态方法而不是 AsyncPolicyWrapAsync 实例方法。

var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutMs), TimeoutStrategy.Optimistic,
    (context, timeSpan, task, ex) =>
    {
        Console.WriteLine($"Timeout {timeSpan.TotalSeconds} seconds");
        return Task.CompletedTask;
    });

var retryPolicy = Policy
    .Handle<Exception>(ex =>
    {
        Console.WriteLine($"Exception tralal: {ex.Message}");
        return true;
    })
    .WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(retryBackOffMs),
    (ex, retryCount, calculatedWaitDuration) =>
    {
        Console.WriteLine(
            $"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {ex.Message}) (Retry count: {retryCount})");
    });

var resilientStrategy = Policy.WrapAsync(retryPolicy, timeoutPolicy);

使用这种方法,您的重试策略定义不会明确引用超时策略。相反,您有两个单独的策略和一个链接的策略。