由于异步问题,并行 HttpClient 请求超时?
Parallel HttpClient requests timing out due to async problem?
我是 运行 一种使用 System.Threading.Tasks.Parallel.ForEach
同步并行的方法。在该方法的最后,它需要发出几十个 HTTP POST
请求,这些请求彼此不依赖。由于我使用的是 .NET Framework 4.6.2,System.Net.Http.HttpClient
完全是异步的,所以我使用 Nito.AsyncEx.AsyncContext
来避免死锁,形式为:
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c =>
await getResultsFor(c).ConfigureAwait(false))));
}
getResultsFor(MyClass c)
方法然后创建一个 HttpRequestMessage
并使用:
发送它
await httpClient.SendAsync(request);
然后解析响应并在 MyClass 的实例上设置相关字段。
我的理解是,同步线程将阻塞在 AsyncContext.Run(...)
,而许多任务由 AsyncContext
拥有的单个 AsyncContextThread
异步执行。当它们都完成后,同步线程就会解除阻塞。
这适用于几百个请求,但当它在五分钟内扩展到几千个时,一些请求开始从服务器返回 HTTP 408 Request Timeout
错误。我的日志表明这些超时发生在峰值负载时,此时发送的请求最多,并且超时发生在收到许多其他请求后很久。
我认为问题在于任务正在 await
内部服务器握手 HttpClient
,但它们没有按照 FIFO 顺序继续,所以当它们继续时,握手已经过期.但是,除了使用 System.Threading.SemaphoreSlim
强制一次只能执行一个任务 await httpClient.SendAsync(...)
之外,我想不出任何方法来解决这个问题。
我的应用程序非常大,将其完全转换为异步是不可行的。
这不是在阻塞之前包装任务可以完成的事情。对于初学者来说,如果请求通过,您可能最终会破坏服务器。现在你正在攻击客户。 .NET Framework 中每个域有 2 个并发请求限制,可以放宽,但如果将其设置得太高,则可能最终导致服务器崩溃。
您可以通过在管道中使用 DataFlow 块以固定的并行度执行请求然后解析它们来解决此问题。假设您有一个名为 MyPayload
的 class,在 属性:
中有很多 Items
ServicePointManager.DefaultConnectionLimit = 1000;
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var downloader=new TransformBlock<string,MyPayload>(async url=>{
var json=await _client.GetStringAsync(url);
var data=JsonConvert.DeserializeObject<MyPayload>(json);
return data;
},options);
var importer=new ActionBlock<MyPayload>(async data=>
{
var items=data.Items;
using(var connection=new SqlConnection(connectionString))
using(var bcp=new SqlBulkCopy(connection))
using(var reader=ObjectReader.Create(items))
{
bcp.DestinationTableName = destination;
connection.Open();
await bcp.WriteToServerAsync(reader);
}
});
downloader.LinkTo(importer,new DataflowLinkOptions {
PropagateCompletion=true
});
我正在使用 FastMember's ObjectReader 将项目包装在可用于将记录批量插入数据库的 DbDataReader 中。
有了这个管道后,您就可以开始将 URL 发布到头部块,downloader
:
foreach(var url in hugeList)
{
downloader.Post(url);
}
downloader.Complete();
发布所有 URL 后,您告诉 donwloader
完成并等待管道中的最后一个块完成:
await importer.Completion;
首先,Nito.AsyncEx.AsyncContext
将在线程池线程上执行;如 documentation.
中所述,要避免所描述的死锁,需要一个 Nito.AsyncEx.AsyncContextThread
的实例
有两种可能的原因:
- .NET Framework 4.6.2
System.Net.Http.HttpClient
中的错误
- 问题中概述的继续优先级问题,其中个别请求没有及时继续,因此超时。
如 this answer and its comments, from a similar question 所述,使用自定义 TaskScheduler
可能可以解决优先级问题,但使用信号量限制并发请求数可能是最佳答案:
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
public class MyClass
{
private static readonly AsyncContextThread asyncContextThread
= new AsyncContextThread();
private static readonly HttpClient httpClient = new HttpClient();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10);
public HttpRequestMessage Request { get; set; }
public HttpResponseMessage Response { get; private set; }
private async Task GetResponseAsync()
{
await semaphore.WaitAsync();
try
{
Response = await httpClient.SendAsync(Request);
}
finally
{
semaphore.Release();
}
}
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
Task.WaitAll(enumerable.Select(c =>
asyncContextThread.Factory.Run(() =>
c.GetResponseAsync())).ToArray());
}
}
编辑为使用 AsyncContextThread
按预期在非线程池线程上执行异步代码。 AsyncContext
不会自行执行此操作。
我是 运行 一种使用 System.Threading.Tasks.Parallel.ForEach
同步并行的方法。在该方法的最后,它需要发出几十个 HTTP POST
请求,这些请求彼此不依赖。由于我使用的是 .NET Framework 4.6.2,System.Net.Http.HttpClient
完全是异步的,所以我使用 Nito.AsyncEx.AsyncContext
来避免死锁,形式为:
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c =>
await getResultsFor(c).ConfigureAwait(false))));
}
getResultsFor(MyClass c)
方法然后创建一个 HttpRequestMessage
并使用:
await httpClient.SendAsync(request);
然后解析响应并在 MyClass 的实例上设置相关字段。
我的理解是,同步线程将阻塞在 AsyncContext.Run(...)
,而许多任务由 AsyncContext
拥有的单个 AsyncContextThread
异步执行。当它们都完成后,同步线程就会解除阻塞。
这适用于几百个请求,但当它在五分钟内扩展到几千个时,一些请求开始从服务器返回 HTTP 408 Request Timeout
错误。我的日志表明这些超时发生在峰值负载时,此时发送的请求最多,并且超时发生在收到许多其他请求后很久。
我认为问题在于任务正在 await
内部服务器握手 HttpClient
,但它们没有按照 FIFO 顺序继续,所以当它们继续时,握手已经过期.但是,除了使用 System.Threading.SemaphoreSlim
强制一次只能执行一个任务 await httpClient.SendAsync(...)
之外,我想不出任何方法来解决这个问题。
我的应用程序非常大,将其完全转换为异步是不可行的。
这不是在阻塞之前包装任务可以完成的事情。对于初学者来说,如果请求通过,您可能最终会破坏服务器。现在你正在攻击客户。 .NET Framework 中每个域有 2 个并发请求限制,可以放宽,但如果将其设置得太高,则可能最终导致服务器崩溃。
您可以通过在管道中使用 DataFlow 块以固定的并行度执行请求然后解析它们来解决此问题。假设您有一个名为 MyPayload
的 class,在 属性:
Items
ServicePointManager.DefaultConnectionLimit = 1000;
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var downloader=new TransformBlock<string,MyPayload>(async url=>{
var json=await _client.GetStringAsync(url);
var data=JsonConvert.DeserializeObject<MyPayload>(json);
return data;
},options);
var importer=new ActionBlock<MyPayload>(async data=>
{
var items=data.Items;
using(var connection=new SqlConnection(connectionString))
using(var bcp=new SqlBulkCopy(connection))
using(var reader=ObjectReader.Create(items))
{
bcp.DestinationTableName = destination;
connection.Open();
await bcp.WriteToServerAsync(reader);
}
});
downloader.LinkTo(importer,new DataflowLinkOptions {
PropagateCompletion=true
});
我正在使用 FastMember's ObjectReader 将项目包装在可用于将记录批量插入数据库的 DbDataReader 中。
有了这个管道后,您就可以开始将 URL 发布到头部块,downloader
:
foreach(var url in hugeList)
{
downloader.Post(url);
}
downloader.Complete();
发布所有 URL 后,您告诉 donwloader
完成并等待管道中的最后一个块完成:
await importer.Completion;
首先,Nito.AsyncEx.AsyncContext
将在线程池线程上执行;如 documentation.
Nito.AsyncEx.AsyncContextThread
的实例
有两种可能的原因:
- .NET Framework 4.6.2
System.Net.Http.HttpClient
中的错误 - 问题中概述的继续优先级问题,其中个别请求没有及时继续,因此超时。
如 this answer and its comments, from a similar question 所述,使用自定义 TaskScheduler
可能可以解决优先级问题,但使用信号量限制并发请求数可能是最佳答案:
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
public class MyClass
{
private static readonly AsyncContextThread asyncContextThread
= new AsyncContextThread();
private static readonly HttpClient httpClient = new HttpClient();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10);
public HttpRequestMessage Request { get; set; }
public HttpResponseMessage Response { get; private set; }
private async Task GetResponseAsync()
{
await semaphore.WaitAsync();
try
{
Response = await httpClient.SendAsync(Request);
}
finally
{
semaphore.Release();
}
}
public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
Task.WaitAll(enumerable.Select(c =>
asyncContextThread.Factory.Run(() =>
c.GetResponseAsync())).ToArray());
}
}
编辑为使用 AsyncContextThread
按预期在非线程池线程上执行异步代码。 AsyncContext
不会自行执行此操作。