HttpClient TimeOut 和 Polly Bulkhead Policy 问题
HttpClient TimeOut and Polly Bulkhead Policy problem
我在使用 Polly Bulkhead 策略时遇到很多超时异常,此策略帮助我限制发送到特定主机的并发调用的数量。但是,HttpClient 超时时间似乎影响了整个代表。
我正在使用 IHttpClientFactory 通过以下代码对其进行配置:
services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));
private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
.AsAsyncPolicy<HttpResponseMessage>();
}
我的问题是我希望超时只影响请求本身而不影响舱壁策略,因为我想要实现的行为如下:
- 限制对特定主机的并发请求数
- 无限等待直到有能力发送请求(当队列满时,Polly 将引发异常)
- 向主机发送请求并应用超时,例如默认超时。
我已经使用 Semaphore
而不是 Bulkhead Polly 策略实现了该行为,但我想使用策略封装该代码。
谢谢。
我将这些示例放在一起以演示不同的选项,您如何对 HttpClient
请求执行限制。我必须强调,这些只是示例,远非生产代码,因此请透过玻璃仔细检查它们。
以下示例代码展示了如何在发生火灾和忘记时发出请求
方式(因此他们不关心响应)。这些解决方案假定请求数多于可用吞吐量。换句话说,生产者比消费者快,这就是为什么有某种排队机制来处理这种不平衡的原因。
带背部和动作积木
public class ThrottlingWithBatchBlock
{
static readonly HttpClient client = new();
private readonly BatchBlock<HttpRequestMessage> requests = new(100);
private ActionBlock<HttpRequestMessage[]> consumer;
public ThrottlingWithBatchBlock()
{
consumer = new(
reqs => ConsumerAsync(reqs),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
requests.LinkTo(consumer);
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
private async Task ConsumerAsync(HttpRequestMessage[] requests)
{
foreach (var request in requests)
await client.SendAsync(request).ConfigureAwait(false);
}
}
带缓冲块
public class ThrottlingWithBufferBlock
{
static readonly HttpClient client = new();
private readonly BufferBlock<HttpRequestMessage> requests = new(
new DataflowBlockOptions { BoundedCapacity = 100 });
public ThrottlingWithBufferBlock()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.OutputAvailableAsync())
{
var request = await requests.ReceiveAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
有频道
public class ThrottlingWithChannels
{
static readonly HttpClient client = new();
private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
public ThrottlingWithChannels()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.Writer.WaitToWriteAsync();
await requests.Writer.WriteAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.Reader.WaitToReadAsync())
{
var request = await requests.Reader.ReadAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
使用阻塞收集
public class ThrottlingWithBlockingCollection
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithBlockingCollection()
{
_ = Enumerable.Range(1, 100)
.Select(_ => ConsumerAsync()).ToArray();
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
async Task ConsumerAsync()
{
while (true)
{
var request = requests.Take();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
使用并行 Foreach
public class ThrottlingWithParallelForEach
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithParallelForEach()
{
_ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
{
var toBeProcessedJobs = new HashSet<Task>();
var remainingJobsEnumerator = source.GetEnumerator();
void AddNewJob()
{
if (remainingJobsEnumerator.MoveNext())
{
var readyToProcessJob = body(remainingJobsEnumerator.Current);
toBeProcessedJobs.Add(readyToProcessJob);
}
}
while (toBeProcessedJobs.Count < degreeOfParallelism)
{
AddNewJob();
}
while (toBeProcessedJobs.Count > 0)
{
Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
toBeProcessedJobs.Remove(processed);
AddNewJob();
}
return;
}
}
我在使用 Polly Bulkhead 策略时遇到很多超时异常,此策略帮助我限制发送到特定主机的并发调用的数量。但是,HttpClient 超时时间似乎影响了整个代表。
我正在使用 IHttpClientFactory 通过以下代码对其进行配置:
services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));
private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
return Policy.BulkheadAsync(maxConcurrentRequests, int.MaxValue)
.AsAsyncPolicy<HttpResponseMessage>();
}
我的问题是我希望超时只影响请求本身而不影响舱壁策略,因为我想要实现的行为如下:
- 限制对特定主机的并发请求数
- 无限等待直到有能力发送请求(当队列满时,Polly 将引发异常)
- 向主机发送请求并应用超时,例如默认超时。
我已经使用 Semaphore
而不是 Bulkhead Polly 策略实现了该行为,但我想使用策略封装该代码。
谢谢。
我将这些示例放在一起以演示不同的选项,您如何对 HttpClient
请求执行限制。我必须强调,这些只是示例,远非生产代码,因此请透过玻璃仔细检查它们。
以下示例代码展示了如何在发生火灾和忘记时发出请求 方式(因此他们不关心响应)。这些解决方案假定请求数多于可用吞吐量。换句话说,生产者比消费者快,这就是为什么有某种排队机制来处理这种不平衡的原因。
带背部和动作积木
public class ThrottlingWithBatchBlock
{
static readonly HttpClient client = new();
private readonly BatchBlock<HttpRequestMessage> requests = new(100);
private ActionBlock<HttpRequestMessage[]> consumer;
public ThrottlingWithBatchBlock()
{
consumer = new(
reqs => ConsumerAsync(reqs),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
requests.LinkTo(consumer);
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
private async Task ConsumerAsync(HttpRequestMessage[] requests)
{
foreach (var request in requests)
await client.SendAsync(request).ConfigureAwait(false);
}
}
带缓冲块
public class ThrottlingWithBufferBlock
{
static readonly HttpClient client = new();
private readonly BufferBlock<HttpRequestMessage> requests = new(
new DataflowBlockOptions { BoundedCapacity = 100 });
public ThrottlingWithBufferBlock()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.SendAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.OutputAvailableAsync())
{
var request = await requests.ReceiveAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
有频道
public class ThrottlingWithChannels
{
static readonly HttpClient client = new();
private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
new BoundedChannelOptions(100) { SingleWriter = true, SingleReader = false });
public ThrottlingWithChannels()
{
_ = ConsumerAsync();
}
public async Task IssueNewRequest(HttpRequestMessage request)
{
await requests.Writer.WaitToWriteAsync();
await requests.Writer.WriteAsync(request);
}
async Task ConsumerAsync()
{
while (await requests.Reader.WaitToReadAsync())
{
var request = await requests.Reader.ReadAsync();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
使用阻塞收集
public class ThrottlingWithBlockingCollection
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithBlockingCollection()
{
_ = Enumerable.Range(1, 100)
.Select(_ => ConsumerAsync()).ToArray();
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
async Task ConsumerAsync()
{
while (true)
{
var request = requests.Take();
await client.SendAsync(request).ConfigureAwait(false);
}
}
}
使用并行 Foreach
public class ThrottlingWithParallelForEach
{
static readonly HttpClient client = new();
private BlockingCollection<HttpRequestMessage> requests = new();
public ThrottlingWithParallelForEach()
{
_ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false), 100);
}
public Task IssueNewRequest(HttpRequestMessage request)
{
requests.Add(request);
return Task.CompletedTask;
}
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int degreeOfParallelism)
{
var toBeProcessedJobs = new HashSet<Task>();
var remainingJobsEnumerator = source.GetEnumerator();
void AddNewJob()
{
if (remainingJobsEnumerator.MoveNext())
{
var readyToProcessJob = body(remainingJobsEnumerator.Current);
toBeProcessedJobs.Add(readyToProcessJob);
}
}
while (toBeProcessedJobs.Count < degreeOfParallelism)
{
AddNewJob();
}
while (toBeProcessedJobs.Count > 0)
{
Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
toBeProcessedJobs.Remove(processed);
AddNewJob();
}
return;
}
}