使用 Polly 进行重连和超时
Using Polly to do reconnects and timeouts
我在尝试完成以下操作时遇到问题:
重新连接逻辑 - 我试图创建一个 Polly 策略,当您尝试在没有 Internet 连接的情况下执行 StartAsync
时,它会起作用。然而,当它达到 ReceiveLoop
时,该策略不再影响该方法,如果我们的连接在该点停止,它永远不会尝试重新连接。它只是抛出以下异常:Disconnected: The remote party closed the WebSocket connection without completing the close handshake.
。也许我应该制定两项政策:一项在 StartAsync
中,一项在 ReceiveLoop
中,但出于某种原因,它对我来说并不合适,所以这就是我问这个问题的原因。
超时 - 我想为每个 ClientWebSocket
方法调用添加超时,例如ConnectAsync、SendAsync 等。我对 Polly 不太熟悉,但我相信此策略会自动为我们完成这些操作。但是,我需要有人来确认这一点。超时,我的意思是类似_webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(timeoutMilliseconds)
的逻辑,TimeoutAfter的实现可以参考here. An example how other repos did it can be found here.
简化,我想让这个 class 有弹性,这意味着与其尝试连接到死的 web 套接字服务器 30 秒但没有成功,不管是什么原因,它应该快速失败 - > 在 10 秒内重试 -> 快速失败 -> 再试一次,依此类推。这个等待和重试逻辑应该重复,直到我们调用StopAsync
或释放实例。
您可以在 GitHub 上找到 WebSocketDuplexPipe class。
public sealed class Client : IDisposable
{
private const int RetrySeconds = 10;
private readonly WebSocketDuplexPipe _webSocketPipe;
private readonly string _url;
public Client(string url)
{
_url = url;
_webSocketPipe = new WebSocketDuplexPipe();
}
public Task StartAsync(CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
.WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
(exception, calculatedWaitDuration) =>
{
Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
});
return retryPolicy.ExecuteAsync(async () =>
{
await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
_ = ReceiveLoop();
});
}
public Task StopAsync()
{
return _webSocketPipe.StopAsync();
}
public async Task SendAsync(string data, CancellationToken cancellationToken = default)
{
var encoded = Encoding.UTF8.GetBytes(data);
var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
}
private async Task ReceiveLoop()
{
var input = _webSocketPipe.Input;
try
{
while (true)
{
var result = await input.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
while (MessageParser.TryParse(ref buffer, out var payload))
{
var message = Encoding.UTF8.GetString(payload);
_messageReceivedSubject.OnNext(message);
}
}
if (result.IsCompleted)
{
break;
}
}
finally
{
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Disconnected: {ex.Message}");
}
}
}
让我通过评论在回答中捕捉我们谈话的本质。
ReceiveLoop
重试
在您等待 input.ReadAsync
完成时,您的重试策略将从 ExecuteAsync
成功退出。原因是你不是在等待 ReceiveLoop
,而是你只是以火烧火燎的方式开始它。
换句话说,您的重试逻辑将仅适用于 StartAsync
和 ReceiveLoop
.
中等待之前的代码。
修复方法是将重试逻辑移至 ReceiveLoop
。
超时
Polly 的超时策略可以使用乐观或悲观策略。 The former one 严重依赖 CancellationToken
.
- 因此,如果您将
CancellationToken.None
传递给 ExecuteAsync
,那么您基本上就是说让 TimeoutPolicy 处理取消过程。
- 如果你传递一个已经存在的令牌,那么装饰任务可以是
由 TimeoutPolicy 或提供的令牌取消。
请记住它会抛出 TimeoutRejectedException
而不是 OperationCanceledException
。
onTimeoutAsync
TimeoutAsync
有几个重载可以接受两个 onTimeoutAsync
代表之一
Func<Context, TimeSpan, Task, Task> onTimeoutAsync
或
Func<Context, TimeSpan, Task, Exception, Task> onTimeoutAsync
如果您有在 TimeoutRejectedException
.
上触发的外部策略(例如重试),那么记录超时已发生的事实可能很有用
链接策略
我建议使用 Policy.WrapAsync
静态方法而不是 AsyncPolicy
的 WrapAsync
实例方法。
var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutMs), TimeoutStrategy.Optimistic,
(context, timeSpan, task, ex) =>
{
Console.WriteLine($"Timeout {timeSpan.TotalSeconds} seconds");
return Task.CompletedTask;
});
var retryPolicy = Policy
.Handle<Exception>(ex =>
{
Console.WriteLine($"Exception tralal: {ex.Message}");
return true;
})
.WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(retryBackOffMs),
(ex, retryCount, calculatedWaitDuration) =>
{
Console.WriteLine(
$"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {ex.Message}) (Retry count: {retryCount})");
});
var resilientStrategy = Policy.WrapAsync(retryPolicy, timeoutPolicy);
使用这种方法,您的重试策略定义不会明确引用超时策略。相反,您有两个单独的策略和一个链接的策略。
我在尝试完成以下操作时遇到问题:
重新连接逻辑 - 我试图创建一个 Polly 策略,当您尝试在没有 Internet 连接的情况下执行
StartAsync
时,它会起作用。然而,当它达到ReceiveLoop
时,该策略不再影响该方法,如果我们的连接在该点停止,它永远不会尝试重新连接。它只是抛出以下异常:Disconnected: The remote party closed the WebSocket connection without completing the close handshake.
。也许我应该制定两项政策:一项在StartAsync
中,一项在ReceiveLoop
中,但出于某种原因,它对我来说并不合适,所以这就是我问这个问题的原因。超时 - 我想为每个
ClientWebSocket
方法调用添加超时,例如ConnectAsync、SendAsync 等。我对 Polly 不太熟悉,但我相信此策略会自动为我们完成这些操作。但是,我需要有人来确认这一点。超时,我的意思是类似_webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(timeoutMilliseconds)
的逻辑,TimeoutAfter的实现可以参考here. An example how other repos did it can be found here.
简化,我想让这个 class 有弹性,这意味着与其尝试连接到死的 web 套接字服务器 30 秒但没有成功,不管是什么原因,它应该快速失败 - > 在 10 秒内重试 -> 快速失败 -> 再试一次,依此类推。这个等待和重试逻辑应该重复,直到我们调用StopAsync
或释放实例。
您可以在 GitHub 上找到 WebSocketDuplexPipe class。
public sealed class Client : IDisposable
{
private const int RetrySeconds = 10;
private readonly WebSocketDuplexPipe _webSocketPipe;
private readonly string _url;
public Client(string url)
{
_url = url;
_webSocketPipe = new WebSocketDuplexPipe();
}
public Task StartAsync(CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.Handle<Exception>(e => !cancellationToken.IsCancellationRequested)
.WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(RetrySeconds),
(exception, calculatedWaitDuration) =>
{
Console.WriteLine($"{exception.Message}. Retry in {calculatedWaitDuration.TotalSeconds} seconds.");
});
return retryPolicy.ExecuteAsync(async () =>
{
await _webSocketPipe.StartAsync(_url, cancellationToken).ConfigureAwait(false);
_ = ReceiveLoop();
});
}
public Task StopAsync()
{
return _webSocketPipe.StopAsync();
}
public async Task SendAsync(string data, CancellationToken cancellationToken = default)
{
var encoded = Encoding.UTF8.GetBytes(data);
var bufferSend = new ArraySegment<byte>(encoded, 0, encoded.Length);
await _webSocketPipe.Output.WriteAsync(bufferSend, cancellationToken).ConfigureAwait(false);
}
private async Task ReceiveLoop()
{
var input = _webSocketPipe.Input;
try
{
while (true)
{
var result = await input.ReadAsync().ConfigureAwait(false);
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
if (!buffer.IsEmpty)
{
while (MessageParser.TryParse(ref buffer, out var payload))
{
var message = Encoding.UTF8.GetString(payload);
_messageReceivedSubject.OnNext(message);
}
}
if (result.IsCompleted)
{
break;
}
}
finally
{
input.AdvanceTo(buffer.Start, buffer.End);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Disconnected: {ex.Message}");
}
}
}
让我通过评论在回答中捕捉我们谈话的本质。
ReceiveLoop
重试
在您等待 input.ReadAsync
完成时,您的重试策略将从 ExecuteAsync
成功退出。原因是你不是在等待 ReceiveLoop
,而是你只是以火烧火燎的方式开始它。
换句话说,您的重试逻辑将仅适用于 StartAsync
和 ReceiveLoop
.
修复方法是将重试逻辑移至 ReceiveLoop
。
超时
Polly 的超时策略可以使用乐观或悲观策略。 The former one 严重依赖 CancellationToken
.
- 因此,如果您将
CancellationToken.None
传递给ExecuteAsync
,那么您基本上就是说让 TimeoutPolicy 处理取消过程。 - 如果你传递一个已经存在的令牌,那么装饰任务可以是 由 TimeoutPolicy 或提供的令牌取消。
请记住它会抛出 TimeoutRejectedException
而不是 OperationCanceledException
。
onTimeoutAsync
TimeoutAsync
有几个重载可以接受两个 onTimeoutAsync
代表之一
Func<Context, TimeSpan, Task, Task> onTimeoutAsync
或
Func<Context, TimeSpan, Task, Exception, Task> onTimeoutAsync
如果您有在 TimeoutRejectedException
.
链接策略
我建议使用 Policy.WrapAsync
静态方法而不是 AsyncPolicy
的 WrapAsync
实例方法。
var timeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(timeoutMs), TimeoutStrategy.Optimistic,
(context, timeSpan, task, ex) =>
{
Console.WriteLine($"Timeout {timeSpan.TotalSeconds} seconds");
return Task.CompletedTask;
});
var retryPolicy = Policy
.Handle<Exception>(ex =>
{
Console.WriteLine($"Exception tralal: {ex.Message}");
return true;
})
.WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(retryBackOffMs),
(ex, retryCount, calculatedWaitDuration) =>
{
Console.WriteLine(
$"Retrying in {calculatedWaitDuration.TotalSeconds} seconds (Reason: {ex.Message}) (Retry count: {retryCount})");
});
var resilientStrategy = Policy.WrapAsync(retryPolicy, timeoutPolicy);
使用这种方法,您的重试策略定义不会明确引用超时策略。相反,您有两个单独的策略和一个链接的策略。