如何为短而重复的动作编写超时?

How to write a timeout for short and repeatedly action?

我想为 RabbitMQ C# 客户端的 BasicPublish 方法编写一个超时函数。出于多种原因,有时队列会被阻塞,或者 rabbit 宕机等等。但我想立即检测发布何时失败。我不想出于任何原因阻止该站点。

我担心通过任务或线程实现超时会增加简单发布的开销,我们在生产中这样做了数百万次。

有没有人知道如何在像 BasicPublish 这样的快速阻塞方法上编写快速超时?

澄清:我也在 .Net 4 中工作,我没有异步。

我会说最简单的方法是使用任务/取消令牌。你认为这是开销吗?

public static async Task WithTimeoutAfterStart(
    Func<CancellationToken, Task> operation, TimeSpan timeout)
{
    var source = new CancellationTokenSource();
    var task = operation(source.Token);
    source.CancelAfter(timeout);
    await task;
}

用法:

await WithTimeoutAfterStart(
    ct => SomeOperationAsync(ct), TimeSpan.FromMilliseconds(n));

Polly TimeoutPolicy 正是针对这种情况。

Polly 的 TimeoutStrategy.Optimistic 接近于@ThiagoCustodio 的答案,但它也正确处理了 CancellationTokenSource。然而,RabbitMQ 的 C# 客户端(在撰写本文时)不提供 BasicPublish() 重载 CancellationToken,因此这种方法不相关。

Polly 的 TimeoutStrategy.Pessimistic 针对 BasicPublish() 等场景,在这种情况下,您想要对 没有 CancellationToken 支持。

波莉的 TimeoutStrategy.Pessimistic:

[1] 允许调用线程在执行时超时(离开等待),即使执行的委托不支持取消也是如此。

[2] 以额外 task/thread 为代价(同步执行),并为您管理。

[3] 也 captures the timed-out Task (the task you have walked away from). This can be valuable for logging, and is essential to avoid UnobservedTaskExceptions - particularly in .NET4.0, where an UnobservedTaskException can bring down your entire process.

简单示例:

Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic).Execute(() => BasicPublish(...));

正确避免 UnobservedTaskExceptions 的完整示例:

Policy timeoutPolicy = Policy.Timeout(TimeSpan.FromSeconds(10), TimeoutStrategy.Pessimistic, (context, timespan, task) => 
{
    task.ContinueWith(t => { // ContinueWith important!: the abandoned task may very well still be executing, when the caller times out on waiting for it! 
        if (t.IsFaulted) 
        {
            logger.Error($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds, eventually terminated with: {t.Exception}.");
        }
        else
        {
           // extra logic (if desired) for tasks which complete, despite the caller having 'walked away' earlier due to timeout.
        }
    });
});

timeoutPolicy.Execute(() => BasicPublish(...));

为了避免在 RabbitMQ 变得不可用的情况下建立太多并发挂起 tasks/threads,您可以使用 Bulkhead Isolation policy to limit parallelization and/or a CircuitBreaker to prevent putting calls through for a period, once you detect a certain level of failures. These can be combined with the TimeoutPolicy using PolicyWrap.