由于异步问题,并行 HttpClient 请求超时?

Parallel HttpClient requests timing out due to async problem?

我是 运行 一种使用 System.Threading.Tasks.Parallel.ForEach 同步并行的方法。在该方法的最后,它需要发出几十个 HTTP POST 请求,这些请求彼此不依赖。由于我使用的是 .NET Framework 4.6.2,System.Net.Http.HttpClient 完全是异步的,所以我使用 Nito.AsyncEx.AsyncContext 来避免死锁,形式为:

public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
{
    AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c => 
        await getResultsFor(c).ConfigureAwait(false))));
}

getResultsFor(MyClass c) 方法然后创建一个 HttpRequestMessage 并使用:

发送它
await httpClient.SendAsync(request);

然后解析响应并在 MyClass 的实例上设置相关字段。

我的理解是,同步线程将阻塞在 AsyncContext.Run(...),而许多任务由 AsyncContext 拥有的单个 AsyncContextThread 异步执行。当它们都完成后,同步线程就会解除阻塞。

这适用于几百个请求,但当它在五分钟内扩展到几千个时,一些请求开始从服务器返回 HTTP 408 Request Timeout 错误。我的日志表明这些超时发生在峰值负载时,此时发送的请求最多,并且超时发生在收到许多其他请求后很久。

我认为问题在于任务正在 await 内部服务器握手 HttpClient,但它们没有按照 FIFO 顺序继续,所以当它们继续时,握手已经过期.但是,除了使用 System.Threading.SemaphoreSlim 强制一次只能执行一个任务 await httpClient.SendAsync(...) 之外,我想不出任何方法来解决这个问题。

我的应用程序非常大,将其完全转换为异步是不可行的。

这不是在阻塞之前包装任务可以完成的事情。对于初学者来说,如果请求通过,您可能最终会破坏服务器。现在你正在攻击客户。 .NET Framework 中每个域有 2 个并发请求限制,可以放宽,但如果将其设置得太高,则可能最终导致服务器崩溃。

您可以通过在管道中使用 DataFlow 块以固定的并行度执行请求然后解析它们来解决此问题。假设您有一个名为 MyPayload 的 class,在 属性:

中有很多 Items
ServicePointManager.DefaultConnectionLimit = 1000;

var options=new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10
};

var downloader=new TransformBlock<string,MyPayload>(async url=>{
    var json=await _client.GetStringAsync(url);
    var data=JsonConvert.DeserializeObject<MyPayload>(json);
    return data;
},options);

var importer=new ActionBlock<MyPayload>(async data=>
{
    var items=data.Items;
    
    using(var connection=new SqlConnection(connectionString))
    using(var bcp=new SqlBulkCopy(connection))
    using(var reader=ObjectReader.Create(items))
    {
        bcp.DestinationTableName = destination;
        connection.Open();

        await bcp.WriteToServerAsync(reader);
    }
});


downloader.LinkTo(importer,new DataflowLinkOptions { 
    PropagateCompletion=true
});

我正在使用 FastMember's ObjectReader 将项目包装在可用于将记录批量插入数据库的 DbDataReader 中。

有了这个管道后,您就可以开始将 URL 发布到头部块,downloader :

foreach(var url in hugeList)
{
    downloader.Post(url);
}
downloader.Complete();

发布所有 URL 后,您告诉 donwloader 完成并等待管道中的最后一个块完成:

await importer.Completion;

首先,Nito.AsyncEx.AsyncContext将在线程池线程上执行;如 documentation.

中所述,要避免所描述的死锁,需要一个 Nito.AsyncEx.AsyncContextThread 的实例

有两种可能的原因:

  • .NET Framework 4.6.2 System.Net.Http.HttpClient 中的错误
  • 问题中概述的继续优先级问题,其中个别请求没有及时继续,因此超时。

this answer and its comments, from a similar question 所述,使用自定义 TaskScheduler 可能可以解决优先级问题,但使用信号量限制并发请求数可能是最佳答案:

using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;

public class MyClass 
{
    private static readonly AsyncContextThread asyncContextThread
        = new AsyncContextThread();
    private static readonly HttpClient httpClient = new HttpClient();
    private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10);

    public HttpRequestMessage Request { get; set; }
    public HttpResponseMessage Response { get; private set; }
        
    private async Task GetResponseAsync()
    {
        await semaphore.WaitAsync();
        try
        {
            Response = await httpClient.SendAsync(Request);
        }
        finally
        {
            semaphore.Release();
        }
    }

    public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable)
    {
        Task.WaitAll(enumerable.Select(c =>
            asyncContextThread.Factory.Run(() =>
                c.GetResponseAsync())).ToArray());
    }
}

编辑为使用 AsyncContextThread 按预期在非线程池线程上执行异步代码。 AsyncContext 不会自行执行此操作。