Polly CircuitBreaker 回退不起作用
Polly CircuitBreaker fallback not working
我有以下政策:
var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).WaitAndRetry(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (exception, calculatedWaitDuration, retryCount, context) =>
{
Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).CircuitBreaker(maxExceptionsBeforeBreaking, TimeSpan.FromSeconds(circuitBreakDurationSeconds), onBreak, onReset);
var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions, onBulkheadRejected);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
.Fallback(
fallbackValue: false,
onFallback: (b, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.Fallback(
fallbackAction: (context) => { return false; },
onFallback: (e, context) =>
{
Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));
public bool CallApi(ChangeMapModel changeMessage)
{
var httpClient = new HttpClient();
var endPoint = changeMessage.EndPoint;
var headers = endPoint.Headers;
if (headers != null)
{
foreach (var header in headers)
{
if (header.Contains(':'))
{
var splitHeader = header.Split(':');
httpClient.DefaultRequestHeaders.Add(splitHeader[0], splitHeader[1]);
}
}
}
var res = httpClient.PostAsync(endPoint.Uri, null);
var response = res.Result;
response.EnsureSuccessStatusCode();
return true;
}
我这样执行政策:
policyWrap.Execute((context) => CallApi(changeMessage), new Context(endPoint));
问题是在开路执行操作时我没有在 CircuitBreaker 回调中命中。
我希望通过策略进行 API 调用,要处理的异常类型为 HttpRequestException
。政策定义有问题吗?为什么不调用断路器回退?
我创建了以下 minimum, complete, verifiable example 来帮助探索问题:
注:不一定是成品;只是对发布的代码和额外注释进行了一些小修改,以帮助探索这个问题。
using Polly;
using Polly.CircuitBreaker;
using System;
using System.Net.Http;
using System.Threading.Tasks;
public class Program
{
public static void Main()
{
int maxRetryCount = 6;
double circuitBreakDurationSeconds = 0.2 /* experiment with effect of shorter or longer here, eg: change to = 1, and the fallbackForCircuitBreaker is correctly invoked */ ;
int maxExceptionsBeforeBreaking = 4; /* experiment with effect of fewer here, eg change to = 1, and the fallbackForCircuitBreaker is correctly invoked */
int maxParallelizations = 2;
int maxQueuingActions = 2;
var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || (/*!(e is BrokenCircuitException) &&*/ e.InnerException is HttpRequestException))) // experiment with introducing the extra (!(e is BrokenCircuitException) && ) clause here, if necessary/desired, depending on goal
.WaitAndRetry(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(50 * Math.Pow(2, attempt)),
onRetry: (ex, calculatedWaitDuration, retryCount, context) =>
{
Console.WriteLine(String.Format("Retry => Count: {0}, Wait duration: {1}, Policy Wrap: {2}, Policy: {3}, Endpoint: {4}, Exception: {5}", retryCount, calculatedWaitDuration, context.PolicyWrapKey, context.PolicyKey, context.OperationKey, ex.Message));
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException))
.CircuitBreaker(maxExceptionsBeforeBreaking,
TimeSpan.FromSeconds(circuitBreakDurationSeconds),
onBreak: (ex, breakDuration) => {
Console.WriteLine(String.Format("Circuit breaking for {0} ms due to {1}", breakDuration.TotalMilliseconds, ex.Message));
},
onReset: () => {
Console.WriteLine("Circuit closed again.");
},
onHalfOpen: () => { Console.WriteLine("Half open."); });
var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
/* .OrInner<BrokenCircuitException>() */ // Consider this if necessary.
/* .Or<Exception>(e => circuitBreaker.State != CircuitState.Closed) */ // This check will also detect the circuit in anything but healthy state, regardless of the final exception thrown.
.Fallback(
fallbackValue: false,
onFallback: (b, context) =>
{
Console.WriteLine(String.Format("Operation attempted on broken circuit => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey));
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.Fallback<bool>(
fallbackAction: (context) => { return false; },
onFallback: (e, context) =>
{
Console.WriteLine(String.Format("An unexpected error occured => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}, Exception: {3}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey, e.Exception.Message));
}
);
var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));
bool outcome = policyWrap.Execute((context) => CallApi("http://www.doesnotexistattimeofwriting.com/"), new Context("some endpoint info"));
}
public static bool CallApi(string uri)
{
using (var httpClient = new HttpClient() { Timeout = TimeSpan.FromSeconds(1) }) // Consider HttpClient lifetimes and disposal; this pattern is for minimum change from original posted code, not a recommendation.
{
Task<HttpResponseMessage> res = httpClient.GetAsync(uri);
var response = res.Result; // Consider async/await rather than blocking on the returned Task.
response.EnsureSuccessStatusCode();
return true;
}
}
}
不止一个因素可能导致 fallbackForCircuitBreaker
无法调用:
circuitBreakDurationSeconds
可能设置得比各种尝试和重试之间等待的总时间要短。
如果是这样,电路可能会恢复到半开状态。在 half-open state or closed state 中,导致电路中断的异常按原样重新抛出。 BrokenCircuitException
仅在(完全)开路阻止尝试调用时抛出。
因此,如果您的电路在重试耗尽时恢复到半开状态,则返回回退策略的异常将是 HttpRequestException
,而不是 BrokenCircuitException
。
.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException))
子句可能会捕获具有 InnerException is HttpRequestException
的 CircuitBreakerException
一个CircuitBreakerException
contains the exception which caused the circuit to break as its InnerException
。因此,对 e.InnerException is HttpRequestException
的过度 greedy/looser 检查也可能捕获具有 InnerException is HttpRequestException
的 CircuitBreakerException
。这可能需要也可能不需要,具体取决于您的目标。
我相信原始发布的代码不会发生这种情况,因为它的构造方式特殊。阻塞 HttpClient.DoSomethingAsync(...)
返回的 Task
已经导致 AggregateException->HttpRequestException
,这意味着结果 CircuitBreakerException
嵌套 HttpRequestException
两层深度:
CircuitBreakerException -> AggregateException -> HttpRequestException
所以这不属于发布代码中的 one-深度检查。但是,请注意 意识到 CircuitBreakerException
包含导致电路中断的异常,因为其 InnerException
。这可能会导致 handle 子句检查 only e.InnerException is HttpRequestException
到 unwantedly (出乎意料,如果这不是你的目标)重试 CircuitBreakerException
,如果:
(a) 代码更改为 async
/await
,这将删除 AggregateException
,从而导致嵌套只有一层
(b) 代码更改为 Polly 的 .HandleInner<HttpRequestException>()
syntax,它是递归贪婪的,因此会捕获嵌套两层 CircuitBreakerException->AggregateException->HttpRequestException
.
上面代码中的建议 /* commented out */ // with additional explanation
建议如何调整发布的代码,以便 fallbackForCircuitBreaker
按预期调用。
另外两个想法:
- 如果可能,请考虑更改为
async
/await
。
通过调用 .Result
阻塞 HttpClient.DoSomethingAsync()
可能会影响性能,或者如果与其他异步代码混合会导致死锁风险,并且会引入整个 AggregateException
-with-InnerException
痛.
- 考虑
HttpClient
个实例的处置和生命周期。
(如其他地方广泛讨论的那样,特意将第 3 点和第 4 点保持简短。)
我有以下政策:
var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).WaitAndRetry(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (exception, calculatedWaitDuration, retryCount, context) =>
{
Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).CircuitBreaker(maxExceptionsBeforeBreaking, TimeSpan.FromSeconds(circuitBreakDurationSeconds), onBreak, onReset);
var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions, onBulkheadRejected);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
.Fallback(
fallbackValue: false,
onFallback: (b, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.Fallback(
fallbackAction: (context) => { return false; },
onFallback: (e, context) =>
{
Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));
public bool CallApi(ChangeMapModel changeMessage)
{
var httpClient = new HttpClient();
var endPoint = changeMessage.EndPoint;
var headers = endPoint.Headers;
if (headers != null)
{
foreach (var header in headers)
{
if (header.Contains(':'))
{
var splitHeader = header.Split(':');
httpClient.DefaultRequestHeaders.Add(splitHeader[0], splitHeader[1]);
}
}
}
var res = httpClient.PostAsync(endPoint.Uri, null);
var response = res.Result;
response.EnsureSuccessStatusCode();
return true;
}
我这样执行政策:
policyWrap.Execute((context) => CallApi(changeMessage), new Context(endPoint));
问题是在开路执行操作时我没有在 CircuitBreaker 回调中命中。
我希望通过策略进行 API 调用,要处理的异常类型为 HttpRequestException
。政策定义有问题吗?为什么不调用断路器回退?
我创建了以下 minimum, complete, verifiable example 来帮助探索问题:
注:不一定是成品;只是对发布的代码和额外注释进行了一些小修改,以帮助探索这个问题。
using Polly;
using Polly.CircuitBreaker;
using System;
using System.Net.Http;
using System.Threading.Tasks;
public class Program
{
public static void Main()
{
int maxRetryCount = 6;
double circuitBreakDurationSeconds = 0.2 /* experiment with effect of shorter or longer here, eg: change to = 1, and the fallbackForCircuitBreaker is correctly invoked */ ;
int maxExceptionsBeforeBreaking = 4; /* experiment with effect of fewer here, eg change to = 1, and the fallbackForCircuitBreaker is correctly invoked */
int maxParallelizations = 2;
int maxQueuingActions = 2;
var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || (/*!(e is BrokenCircuitException) &&*/ e.InnerException is HttpRequestException))) // experiment with introducing the extra (!(e is BrokenCircuitException) && ) clause here, if necessary/desired, depending on goal
.WaitAndRetry(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(50 * Math.Pow(2, attempt)),
onRetry: (ex, calculatedWaitDuration, retryCount, context) =>
{
Console.WriteLine(String.Format("Retry => Count: {0}, Wait duration: {1}, Policy Wrap: {2}, Policy: {3}, Endpoint: {4}, Exception: {5}", retryCount, calculatedWaitDuration, context.PolicyWrapKey, context.PolicyKey, context.OperationKey, ex.Message));
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException))
.CircuitBreaker(maxExceptionsBeforeBreaking,
TimeSpan.FromSeconds(circuitBreakDurationSeconds),
onBreak: (ex, breakDuration) => {
Console.WriteLine(String.Format("Circuit breaking for {0} ms due to {1}", breakDuration.TotalMilliseconds, ex.Message));
},
onReset: () => {
Console.WriteLine("Circuit closed again.");
},
onHalfOpen: () => { Console.WriteLine("Half open."); });
var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
/* .OrInner<BrokenCircuitException>() */ // Consider this if necessary.
/* .Or<Exception>(e => circuitBreaker.State != CircuitState.Closed) */ // This check will also detect the circuit in anything but healthy state, regardless of the final exception thrown.
.Fallback(
fallbackValue: false,
onFallback: (b, context) =>
{
Console.WriteLine(String.Format("Operation attempted on broken circuit => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey));
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.Fallback<bool>(
fallbackAction: (context) => { return false; },
onFallback: (e, context) =>
{
Console.WriteLine(String.Format("An unexpected error occured => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}, Exception: {3}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey, e.Exception.Message));
}
);
var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));
bool outcome = policyWrap.Execute((context) => CallApi("http://www.doesnotexistattimeofwriting.com/"), new Context("some endpoint info"));
}
public static bool CallApi(string uri)
{
using (var httpClient = new HttpClient() { Timeout = TimeSpan.FromSeconds(1) }) // Consider HttpClient lifetimes and disposal; this pattern is for minimum change from original posted code, not a recommendation.
{
Task<HttpResponseMessage> res = httpClient.GetAsync(uri);
var response = res.Result; // Consider async/await rather than blocking on the returned Task.
response.EnsureSuccessStatusCode();
return true;
}
}
}
不止一个因素可能导致 fallbackForCircuitBreaker
无法调用:
circuitBreakDurationSeconds
可能设置得比各种尝试和重试之间等待的总时间要短。
如果是这样,电路可能会恢复到半开状态。在 half-open state or closed state 中,导致电路中断的异常按原样重新抛出。 BrokenCircuitException
仅在(完全)开路阻止尝试调用时抛出。
因此,如果您的电路在重试耗尽时恢复到半开状态,则返回回退策略的异常将是 HttpRequestException
,而不是 BrokenCircuitException
。
.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException))
子句可能会捕获具有InnerException is HttpRequestException
的
CircuitBreakerException
一个CircuitBreakerException
contains the exception which caused the circuit to break as its InnerException
。因此,对 e.InnerException is HttpRequestException
的过度 greedy/looser 检查也可能捕获具有 InnerException is HttpRequestException
的 CircuitBreakerException
。这可能需要也可能不需要,具体取决于您的目标。
我相信原始发布的代码不会发生这种情况,因为它的构造方式特殊。阻塞 HttpClient.DoSomethingAsync(...)
返回的 Task
已经导致 AggregateException->HttpRequestException
,这意味着结果 CircuitBreakerException
嵌套 HttpRequestException
两层深度:
CircuitBreakerException -> AggregateException -> HttpRequestException
所以这不属于发布代码中的 one-深度检查。但是,请注意 意识到 CircuitBreakerException
包含导致电路中断的异常,因为其 InnerException
。这可能会导致 handle 子句检查 only e.InnerException is HttpRequestException
到 unwantedly (出乎意料,如果这不是你的目标)重试 CircuitBreakerException
,如果:
(a) 代码更改为 async
/await
,这将删除 AggregateException
,从而导致嵌套只有一层
(b) 代码更改为 Polly 的 .HandleInner<HttpRequestException>()
syntax,它是递归贪婪的,因此会捕获嵌套两层 CircuitBreakerException->AggregateException->HttpRequestException
.
上面代码中的建议 /* commented out */ // with additional explanation
建议如何调整发布的代码,以便 fallbackForCircuitBreaker
按预期调用。
另外两个想法:
- 如果可能,请考虑更改为
async
/await
。
通过调用 .Result
阻塞 HttpClient.DoSomethingAsync()
可能会影响性能,或者如果与其他异步代码混合会导致死锁风险,并且会引入整个 AggregateException
-with-InnerException
痛.
- 考虑
HttpClient
个实例的处置和生命周期。
(如其他地方广泛讨论的那样,特意将第 3 点和第 4 点保持简短。)