如何使多个 API 调用更快?

How to make multiple API calls faster?

我正在请求某种产品的数据 API,但问题是我得到的数据是 20 x 20。所以端点看起来像这样:

https://www.someDummyAPI.com/Api/Products?offset=0&count=20

注意:我无法更改计数,它始终是 20。

即来自该端点的数据将包含 20 条记录,从 0 到 20,之后我必须将偏移量增加 20 以获得下一个 20 条记录,依此类推(总共大约 1500 条记录,所以我必须发出大约 700 条请求)。

获取所有数据后,我使用存储过程将其插入 SQL 数据库(这是不同的过程)。

所以我的问题是,我怎样才能加快获取过程,我考虑过 运行 并行任务,但我需要从响应中获得结果。

现在这个过程看起来像这样:

    protected async void FSL_Sync_btn_Click(object sender, EventArgs e)
    {
        int offset = 0;
        int total= 0;
        bool isFirst = true;
        DataTable resTbl = CreateDt();
        while (offset < total || offset == 0)
        {
            try
            {
                var data = await GetFSLData(offset.ToString(),"Products");

                JObject Jresult = JObject.Parse(data);

                if (isFirst)
                {
                    Int32.TryParse(Jresult.SelectToken("total").ToString(),out total);
                    isFirst = false;
                }
                // Function to chain up data in DataTable
                resTbl = WriteInDataTable(resTbl, Jresult);

                offset += 20;
            }
            catch(Exception ex)
            {
                var msg = ex.Message;
            }
        }
    }

所以我采用的流程是:

  1. 从 API 获取数据(比方说前 20 条记录)。
  2. 使用 WriteInDataTable 函数添加两个现有的 DataTable
  3. resTbl Datatable 中将数据插入 SQL 数据库(完全不同的过程,未在此屏幕截图中显示)。

我还没有使用过并行任务(甚至不知道它是否是一个正确的解决方案),所以非常感谢任何帮助。

获取你的第一条记录并在循环前先设置总数:

var data = await GetFSLData(offset.ToString(),"Products");

JObject Jresult = JObject.Parse(data);

Int32.TryParse(Jresult.SelectToken("total").ToString(),out total);         
                

在下一步中,您可以并行化您的任务:

DataTable resTbl = CreateDt();
var downloadTasks = new List<Task>();
while (offset < total)
{
    downloadTasks.Add(GetFSLData(offset.ToString(),"Products"));
    offset += 20;
}

然后就可以使用Task.WhenAll获取数据

var httpResults = await Task.WhenAll(downloadTasks);
foreach (var jObjectResult in httpResults.Select(JObject.Parse)) 
{
  resTbl = WriteInDataTable(resTbl, Jresult);
}

只是一些需要注意的事情:您会同时遇到很多请求 api,这可能不是一个好主意。如果您 运行 遇到此问题,则可以在 TPL 数据流库中使用 TransformBlockActionBlock。您可以在此处找到更多相关信息:

https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

您可以使用 Task.WhenAll 来并行 运行 您的请求。

    public async Task<IEnumerable<string>> GetDataInParallel()
    {
        var tasks = new List<Task<string>>();
        
        while(...)
        {
           var dataTask = GetFastLaneData(offset.ToString(), "Products"); // does not launch request, only add task to a list
           tasks.Add(dataTask);
           offset += 20
        }

        var datas = await Task.WhenAll(tasks); // launch all tasks

        return datas;
    }

此方法将尝试创建或使用 1000 个线程并对其进行管理,这可能会影响性能,但会比按顺序启动它们快得多。您可以考虑对它们进行批处理以获得 更好的性能 并一次启动大约 100 个任务。

由于代码中的高抽象级别(恕我直言,这很好,但很难在 SO 这样的页面上发现错误),因此很难知道您真正使用和获得的是什么。

所以这里只是一个关于如何将所有请求并行化到 API 以缩短获取时间并将结果一次性写入数据库的草图。也许 API 上有一些配额,您可能必须 运行 这些东西成块,但这可以通过 LINQ 轻松采用。

var httpClient = new HttpClient();
var requests = Enumerable.Range(0, 1500)
    .Where(i => i % 20 == 0)
    // Create all needed requests
    .Select(offset => $"https://www.someDummyAPI.com/Api/Products?offset={offset}&count=20")
    .Select(url => new HttpRequestMessage(HttpMethod.Get, url))
    // Create tasks to call these requests
    .Select(request => httpClient.SendAsync(request));

// Run all of these requests in parallel.
var responses = await Task.WhenAll(requests);
// Create all tasks to get the content out of the requests
var allContentStreams = responses
    .Select(response => response.Content.ReadAsStringAsync());

// Retrieve all content bodies as strings
var allRawContents = await Task.WhenAll(allContentStreams);
// Serialize strings into some usable object
var allData = allRawContents
    .Select(JsonConvert.DeserializeObject<MyDataDTO>);

// Add all objects to the database context.
foreach (var data in allData)
{
    WriteIntoDatabase(data);
}

// Let context persist data into database.
SaveDatabase();

如果您已经升级到 .NET 6 平台,您可以考虑使用 Parallel.ForEachAsync method to parallelize the GetFSLData invocations. This method requires an IEnumerable<T> sequence as source. You can create this sequence using LINQ (the Enumerable.Range method). To avoid any problems associated with the thread-safety of the DataTable class, you can store the JObject results in an intermediate ConcurrentQueue<JObject> 集合,并推迟 DataTable 的创建,直到所有数据都已获取并在本地可用.您可能还需要存储与每个 JObject 关联的 offset,以便可以按其原始顺序插入结果。把所有东西放在一起:

protected async void FSL_Sync_btn_Click(object sender, EventArgs e)
{
    int total = Int32.MaxValue;
    IEnumerable<int> offsets = Enumerable
        .Range(0, Int32.MaxValue)
        .Select(n => checked(n * 20))
        .TakeWhile(offset => offset < Volatile.Read(ref total));

    var options = new ParallelOptions() { MaxDegreeOfParallelism = 10 };
    var results = new ConcurrentQueue<(int Offset, JObject JResult)>();
    await Parallel.ForEachAsync(offsets, options, async (offset, ct) =>
    {
        string data = await GetFSLData(offset.ToString(), "Products");
        JObject Jresult = JObject.Parse(data);
        if (offset == 0)
        {
            Volatile.Write(ref total,
                Int32.Parse(Jresult.SelectToken("total").ToString()));
        }
        results.Enqueue((offset, Jresult));
    });

    DataTable resTbl = CreateDt();
    foreach (var (offset, Jresult) in results.OrderBy(e => e.Offset))
    {
        resTbl = WriteInDataTable(resTbl, Jresult);
    }
}

Volatile.Read/Volatile.Write 是必需的,因为 total 变量可能会被多个线程并行访问。

为了获得最佳性能,您可能需要根据远程服务器的功能和您的互联网连接调整 MaxDegreeOfParallelism 配置。

注意:此解决方案在内存方面效率不高,因为它要求所有数据同时以两种不同格式存储在内存中。