HttpClient TimeOut 和 Polly Bulkhead Policy 问题

HttpClient TimeOut and Polly Bulkhead Policy problem

我在使用 Polly Bulkhead 策略时遇到很多超时异常,此策略帮助我限制发送到特定主机的并发调用的数量。但是,HttpClient 超时时间似乎影响了整个代表。

我正在使用 IHttpClientFactory 通过以下代码对其进行配置:

services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));


private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
    return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
        .AsAsyncPolicy<HttpResponseMessage>();
}

我的问题是我希望超时只影响请求本身而不影响舱壁策略,因为我想要实现的行为如下:

我已经使用 Semaphore 而不是 Bulkhead Polly 策略实现了该行为,但我想使用策略封装该代码。

谢谢。

我将这些示例放在一起以演示不同的选项,您如何对 HttpClient 请求执行限制。我必须强调,这些只是示例,远非生产代码,因此请透过玻璃仔细检查它们。

以下示例代码展示了如何在发生火灾和忘记时发出请求 方式(因此他们不关心响应)。这些解决方案假定请求数多于可用吞吐量。换句话说,生产者比消费者快,这就是为什么有某种排队机制来处理这种不平衡的原因。

带背部和动作积木

public class ThrottlingWithBatchBlock
{
    static readonly HttpClient client = new();
    private readonly BatchBlock<HttpRequestMessage> requests = new(100);
    private ActionBlock<HttpRequestMessage[]> consumer;

    public ThrottlingWithBatchBlock()
    {
        consumer = new(
            reqs => ConsumerAsync(reqs),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
        requests.LinkTo(consumer);
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    private async Task ConsumerAsync(HttpRequestMessage[] requests)
    {
        foreach (var request in requests)
            await client.SendAsync(request).ConfigureAwait(false);
    }
}

带缓冲块

public class ThrottlingWithBufferBlock
{
    static readonly HttpClient client = new();
    private readonly BufferBlock<HttpRequestMessage> requests = new(
            new DataflowBlockOptions { BoundedCapacity = 100 });

    public ThrottlingWithBufferBlock()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.OutputAvailableAsync())
        {
            var request = await requests.ReceiveAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

有频道

public class ThrottlingWithChannels
{
    static readonly HttpClient client = new();
    private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
            new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });

    public ThrottlingWithChannels()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.Writer.WaitToWriteAsync();
        await requests.Writer.WriteAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.Reader.WaitToReadAsync())
        {
            var request = await requests.Reader.ReadAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

使用阻塞收集

public class ThrottlingWithBlockingCollection
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithBlockingCollection()
    {
        _ = Enumerable.Range(1, 100)
            .Select(_ => ConsumerAsync()).ToArray();
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }

    async Task ConsumerAsync()
    {
        while (true)
        {
            var request = requests.Take();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

使用并行 Foreach

public class ThrottlingWithParallelForEach
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithParallelForEach()
    {
        _ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
    public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
    {
        var toBeProcessedJobs = new HashSet<Task>();
        var remainingJobsEnumerator = source.GetEnumerator();

        void AddNewJob()
        {
            if (remainingJobsEnumerator.MoveNext())
            {
                var readyToProcessJob = body(remainingJobsEnumerator.Current);
                toBeProcessedJobs.Add(readyToProcessJob);
            }
        }

        while (toBeProcessedJobs.Count < degreeOfParallelism)
        {
            AddNewJob();
        }

        while (toBeProcessedJobs.Count > 0)
        {
            Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
            toBeProcessedJobs.Remove(processed);
            AddNewJob();
        }

        return;
    }
}