如何为短而重复的动作编写超时?
How to write a timeout for short and repeatedly action?
我想为 RabbitMQ C# 客户端的 BasicPublish 方法编写一个超时函数。出于多种原因,有时队列会被阻塞,或者 rabbit 宕机等等。但我想立即检测发布何时失败。我不想出于任何原因阻止该站点。
我担心通过任务或线程实现超时会增加简单发布的开销,我们在生产中这样做了数百万次。
有没有人知道如何在像 BasicPublish 这样的快速阻塞方法上编写快速超时?
澄清:我也在 .Net 4 中工作,我没有异步。
我会说最简单的方法是使用任务/取消令牌。你认为这是开销吗?
public static async Task WithTimeoutAfterStart(
Func<CancellationToken, Task> operation, TimeSpan timeout)
{
var source = new CancellationTokenSource();
var task = operation(source.Token);
source.CancelAfter(timeout);
await task;
}
用法:
await WithTimeoutAfterStart(
ct => SomeOperationAsync(ct), TimeSpan.FromMilliseconds(n));
Polly TimeoutPolicy 正是针对这种情况。
Polly 的 TimeoutStrategy.Optimistic
接近于@ThiagoCustodio 的答案,但它也正确处理了 CancellationTokenSource
。然而,RabbitMQ 的 C# 客户端(在撰写本文时)不提供 BasicPublish()
重载 CancellationToken
,因此这种方法不相关。
Polly 的 TimeoutStrategy.Pessimistic
针对 BasicPublish()
等场景,在这种情况下,您想要对 没有 CancellationToken
支持。
波莉的 TimeoutStrategy.Pessimistic
:
[1] 允许调用线程在执行时超时(离开等待),即使执行的委托不支持取消也是如此。
[2] 以额外 task/thread 为代价(同步执行),并为您管理。
[3] 也 captures the timed-out Task
(the task you have walked away from). This can be valuable for logging, and is essential to avoid UnobservedTaskExceptions - particularly in .NET4.0, where an UnobservedTaskException can bring down your entire process.
简单示例:
Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic).Execute(() => BasicPublish(...));
正确避免 UnobservedTaskException
s 的完整示例:
Policy timeoutPolicy = Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic, (context, timespan, task) =>
{
task.ContinueWith(t => { // ContinueWith important!: the abandoned task may very well still be executing, when the caller times out on waiting for it!
if (t.IsFaulted)
{
logger.Error($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds, eventually terminated with: {t.Exception}.");
}
else
{
// extra logic (if desired) for tasks which complete, despite the caller having 'walked away' earlier due to timeout.
}
});
});
timeoutPolicy.Execute(() => BasicPublish(...));
为了避免在 RabbitMQ 变得不可用的情况下建立太多并发挂起 tasks/threads,您可以使用 Bulkhead Isolation policy to limit parallelization and/or a CircuitBreaker to prevent putting calls through for a period, once you detect a certain level of failures. These can be combined with the TimeoutPolicy using PolicyWrap
.
我想为 RabbitMQ C# 客户端的 BasicPublish 方法编写一个超时函数。出于多种原因,有时队列会被阻塞,或者 rabbit 宕机等等。但我想立即检测发布何时失败。我不想出于任何原因阻止该站点。
我担心通过任务或线程实现超时会增加简单发布的开销,我们在生产中这样做了数百万次。
有没有人知道如何在像 BasicPublish 这样的快速阻塞方法上编写快速超时?
澄清:我也在 .Net 4 中工作,我没有异步。
我会说最简单的方法是使用任务/取消令牌。你认为这是开销吗?
public static async Task WithTimeoutAfterStart(
Func<CancellationToken, Task> operation, TimeSpan timeout)
{
var source = new CancellationTokenSource();
var task = operation(source.Token);
source.CancelAfter(timeout);
await task;
}
用法:
await WithTimeoutAfterStart(
ct => SomeOperationAsync(ct), TimeSpan.FromMilliseconds(n));
Polly TimeoutPolicy 正是针对这种情况。
Polly 的 TimeoutStrategy.Optimistic
接近于@ThiagoCustodio 的答案,但它也正确处理了 CancellationTokenSource
。然而,RabbitMQ 的 C# 客户端(在撰写本文时)不提供 BasicPublish()
重载 CancellationToken
,因此这种方法不相关。
Polly 的 TimeoutStrategy.Pessimistic
针对 BasicPublish()
等场景,在这种情况下,您想要对 没有 CancellationToken
支持。
波莉的 TimeoutStrategy.Pessimistic
:
[1] 允许调用线程在执行时超时(离开等待),即使执行的委托不支持取消也是如此。
[2] 以额外 task/thread 为代价(同步执行),并为您管理。
[3] 也 captures the timed-out Task
(the task you have walked away from). This can be valuable for logging, and is essential to avoid UnobservedTaskExceptions - particularly in .NET4.0, where an UnobservedTaskException can bring down your entire process.
简单示例:
Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic).Execute(() => BasicPublish(...));
正确避免 UnobservedTaskException
s 的完整示例:
Policy timeoutPolicy = Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic, (context, timespan, task) =>
{
task.ContinueWith(t => { // ContinueWith important!: the abandoned task may very well still be executing, when the caller times out on waiting for it!
if (t.IsFaulted)
{
logger.Error($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds, eventually terminated with: {t.Exception}.");
}
else
{
// extra logic (if desired) for tasks which complete, despite the caller having 'walked away' earlier due to timeout.
}
});
});
timeoutPolicy.Execute(() => BasicPublish(...));
为了避免在 RabbitMQ 变得不可用的情况下建立太多并发挂起 tasks/threads,您可以使用 Bulkhead Isolation policy to limit parallelization and/or a CircuitBreaker to prevent putting calls through for a period, once you detect a certain level of failures. These can be combined with the TimeoutPolicy using PolicyWrap
.