WaitAndRetryPolicy 结合 BulkheadPolicy,优先重试。可能吗?

WaitAndRetryPolicy combined with BulkheadPolicy, prioritizing retries. Is it possible?

我正在评估 Polly 库的功能和灵活性,作为评估过程的一部分,我试图将 WaitAndRetryPolicyBulkheadPolicy 策略结合起来,以实现弹性和节流的结合。问题是这种组合的结果行为不符合我的期望和偏好。我想要的是优先重试失败的操作而不是执行 fresh/unprocessed 操作。

理由是(根据我的经验)失败的操作再次失败的可能性更大。因此,如果所有失败的操作都被推到整个过程的末尾,那么整个过程的最后一部分将非常缓慢且效率低下。不仅因为这些操作可能会再次失败,而且因为每次重试之间所需的延迟,每次失败尝试后可能需要逐渐延长。所以我想要的是每次 BulkheadPolicy 有空间开始一个新操作时,如果它的队列中有一个操作,就选择一个重试操作。

这是一个演示我想要修复的不良行为的示例。需要处理 10 个项目。第一次尝试都失败了,第二次尝试成功了,总共执行了 20 次。重试项目之前的等待时间为一秒。任何时候都应该只有 2 个操作处于活动状态:

var policy = Policy.WrapAsync
(
    Policy
        .Handle<HttpRequestException>()
        .WaitAndRetryAsync(retryCount: 1, _ => TimeSpan.FromSeconds(1)),

    Policy.BulkheadAsync(
        maxParallelization: 2, maxQueuingActions: Int32.MaxValue)
);

var tasks = new List<Task>();
foreach (var item in Enumerable.Range(1, 10))
{
    int attempt = 0;
    tasks.Add(policy.ExecuteAsync(async () =>
    {
        attempt++;
        Console.WriteLine($"{DateTime.Now:HH:mm:ss} Starting #{item}/{attempt}");
        await Task.Delay(1000);
        if (attempt == 1) throw new HttpRequestException();
    }));
}
await Task.WhenAll(tasks);

输出(实际):

09:07:12 Starting #1/1
09:07:12 Starting #2/1
09:07:13 Starting #3/1
09:07:13 Starting #4/1
09:07:14 Starting #5/1
09:07:14 Starting #6/1
09:07:15 Starting #8/1
09:07:15 Starting #7/1
09:07:16 Starting #10/1
09:07:16 Starting #9/1
09:07:17 Starting #2/2
09:07:17 Starting #1/2
09:07:18 Starting #4/2
09:07:18 Starting #3/2
09:07:19 Starting #5/2
09:07:19 Starting #6/2
09:07:20 Starting #7/2
09:07:20 Starting #8/2
09:07:21 Starting #10/2
09:07:21 Starting #9/2

预期的输出应该是这样的(我手写的):

09:07:12 Starting #1/1
09:07:12 Starting #2/1
09:07:13 Starting #3/1
09:07:13 Starting #4/1
09:07:14 Starting #1/2
09:07:14 Starting #2/2
09:07:15 Starting #3/2
09:07:15 Starting #4/2
09:07:16 Starting #5/1
09:07:16 Starting #6/1
09:07:17 Starting #7/1
09:07:17 Starting #8/1
09:07:18 Starting #5/2
09:07:18 Starting #6/2
09:07:19 Starting #7/2
09:07:19 Starting #8/2
09:07:20 Starting #9/1
09:07:20 Starting #10/1
09:07:22 Starting #9/2
09:07:22 Starting #10/2

例如,在 09:07:14 标记处,失败项目 #1 的 1 秒等待期已过期,因此其第二次尝试应优先于项目 #5 的第一次尝试。

解决这个问题的一个不成功的尝试是颠倒这两个策略的顺序。不幸的是,将 BulkheadPolicy 放在 WaitAndRetryPolicy 之前会导致并行化减少。发生的情况是 BulkheadPolicy 将项目的所有重试视为单一操作,因此两次重试之间的“等待”阶段计入并行化限制。显然我不想这样。 documentation 也明确了我例子中两个策略的顺序是正确的:

BulkheadPolicy: Usually innermost unless wraps a final TimeoutPolicy. Certainly inside any WaitAndRetry. The Bulkhead intentionally limits the parallelization. You want that parallelization devoted to running the delegate, not occupied by waits for a retry.

有什么方法可以实现我想要的行为,同时保持在 Polly 库的范围内?

我找到了一个简单但不完美的解决这个问题的方法。解决方案是在 WaitAndRetryPolicy 之前添加第二个 BulkheadPolicy(在“外部”位置)。这个额外的 Bulkhead 将仅用于重新确定工作负载的优先级(通过充当外部队列),并且应该具有比控制并行化的内部 Bulkhead 大得多的容量(x10 或更多)。原因是外部 Bulkhead 也可能以不可预测的方式影响(减少)并行化,我们不希望这样。这就是为什么我认为这个解决方案不完美,因为既不是优先级是最优的,也不能保证并行化不会受到影响。

这是原始示例的组合策略,通过外部 BulkheadPolicy 进行了增强。它的容量只有 2.5 倍大,适合这个人为的例子,但对于一般情况来说太小了:

var policy = Policy.WrapAsync
(
    Policy.BulkheadAsync( // For improving prioritization
        maxParallelization: 5, maxQueuingActions: Int32.MaxValue),

    Policy
        .Handle<HttpRequestException>()
        .WaitAndRetryAsync(retryCount: 1, _ => TimeSpan.FromSeconds(1)),

    Policy.BulkheadAsync( // For controlling paralellization
        maxParallelization: 2, maxQueuingActions: Int32.MaxValue)
);

这是执行的输出:

12:36:02 Starting #1/1
12:36:02 Starting #2/1
12:36:03 Starting #3/1
12:36:03 Starting #4/1
12:36:04 Starting #2/2
12:36:04 Starting #5/1
12:36:05 Starting #1/2
12:36:05 Starting #3/2
12:36:06 Starting #6/1
12:36:06 Starting #4/2
12:36:07 Starting #8/1
12:36:07 Starting #5/2
12:36:08 Starting #9/1
12:36:08 Starting #7/1
12:36:09 Starting #10/1
12:36:09 Starting #6/2
12:36:10 Starting #7/2
12:36:10 Starting #8/2
12:36:11 Starting #9/2
12:36:11 Starting #10/2

虽然这个解决方案并不完美,但我相信在一般情况下它应该利大于弊,并且应该会带来更好的整体性能。