如何使多个 API 调用更快?
How to make multiple API calls faster?
我正在请求某种产品的数据 API,但问题是我得到的数据是 20 x 20。所以端点看起来像这样:
https://www.someDummyAPI.com/Api/Products?offset=0&count=20
注意:我无法更改计数,它始终是 20。
即来自该端点的数据将包含 20 条记录,从 0 到 20,之后我必须将偏移量增加 20 以获得下一个 20 条记录,依此类推(总共大约 1500 条记录,所以我必须发出大约 700 条请求)。
获取所有数据后,我使用存储过程将其插入 SQL 数据库(这是不同的过程)。
所以我的问题是,我怎样才能加快获取过程,我考虑过 运行 并行任务,但我需要从响应中获得结果。
现在这个过程看起来像这样:
protected async void FSL_Sync_btn_Click(object sender, EventArgs e)
{
int offset = 0;
int total= 0;
bool isFirst = true;
DataTable resTbl = CreateDt();
while (offset < total || offset == 0)
{
try
{
var data = await GetFSLData(offset.ToString(),"Products");
JObject Jresult = JObject.Parse(data);
if (isFirst)
{
Int32.TryParse(Jresult.SelectToken("total").ToString(),out total);
isFirst = false;
}
// Function to chain up data in DataTable
resTbl = WriteInDataTable(resTbl, Jresult);
offset += 20;
}
catch(Exception ex)
{
var msg = ex.Message;
}
}
}
所以我采用的流程是:
- 从 API 获取数据(比方说前 20 条记录)。
- 使用
WriteInDataTable
函数添加两个现有的 DataTable
。
- 从
resTbl
Datatable
中将数据插入 SQL 数据库(完全不同的过程,未在此屏幕截图中显示)。
我还没有使用过并行任务(甚至不知道它是否是一个正确的解决方案),所以非常感谢任何帮助。
获取你的第一条记录并在循环前先设置总数:
var data = await GetFSLData(offset.ToString(),"Products");
JObject Jresult = JObject.Parse(data);
Int32.TryParse(Jresult.SelectToken("total").ToString(),out total);
在下一步中,您可以并行化您的任务:
DataTable resTbl = CreateDt();
var downloadTasks = new List<Task>();
while (offset < total)
{
downloadTasks.Add(GetFSLData(offset.ToString(),"Products"));
offset += 20;
}
然后就可以使用Task.WhenAll
获取数据
var httpResults = await Task.WhenAll(downloadTasks);
foreach (var jObjectResult in httpResults.Select(JObject.Parse))
{
resTbl = WriteInDataTable(resTbl, Jresult);
}
只是一些需要注意的事情:您会同时遇到很多请求 api,这可能不是一个好主意。如果您 运行 遇到此问题,则可以在 TPL 数据流库中使用 TransformBlock
和 ActionBlock
。您可以在此处找到更多相关信息:
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
您可以使用 Task.WhenAll
来并行 运行 您的请求。
public async Task<IEnumerable<string>> GetDataInParallel()
{
var tasks = new List<Task<string>>();
while(...)
{
var dataTask = GetFastLaneData(offset.ToString(), "Products"); // does not launch request, only add task to a list
tasks.Add(dataTask);
offset += 20
}
var datas = await Task.WhenAll(tasks); // launch all tasks
return datas;
}
此方法将尝试创建或使用 1000 个线程并对其进行管理,这可能会影响性能,但会比按顺序启动它们快得多。您可以考虑对它们进行批处理以获得 更好的性能 并一次启动大约 100 个任务。
由于代码中的高抽象级别(恕我直言,这很好,但很难在 SO 这样的页面上发现错误),因此很难知道您真正使用和获得的是什么。
所以这里只是一个关于如何将所有请求并行化到 API 以缩短获取时间并将结果一次性写入数据库的草图。也许 API 上有一些配额,您可能必须 运行 这些东西成块,但这可以通过 LINQ 轻松采用。
var httpClient = new HttpClient();
var requests = Enumerable.Range(0, 1500)
.Where(i => i % 20 == 0)
// Create all needed requests
.Select(offset => $"https://www.someDummyAPI.com/Api/Products?offset={offset}&count=20")
.Select(url => new HttpRequestMessage(HttpMethod.Get, url))
// Create tasks to call these requests
.Select(request => httpClient.SendAsync(request));
// Run all of these requests in parallel.
var responses = await Task.WhenAll(requests);
// Create all tasks to get the content out of the requests
var allContentStreams = responses
.Select(response => response.Content.ReadAsStringAsync());
// Retrieve all content bodies as strings
var allRawContents = await Task.WhenAll(allContentStreams);
// Serialize strings into some usable object
var allData = allRawContents
.Select(JsonConvert.DeserializeObject<MyDataDTO>);
// Add all objects to the database context.
foreach (var data in allData)
{
WriteIntoDatabase(data);
}
// Let context persist data into database.
SaveDatabase();
如果您已经升级到 .NET 6 平台,您可以考虑使用 Parallel.ForEachAsync
method to parallelize the GetFSLData
invocations. This method requires an IEnumerable<T>
sequence as source. You can create this sequence using LINQ (the Enumerable.Range
method). To avoid any problems associated with the thread-safety of the DataTable
class, you can store the JObject
results in an intermediate ConcurrentQueue<JObject>
集合,并推迟 DataTable
的创建,直到所有数据都已获取并在本地可用.您可能还需要存储与每个 JObject
关联的 offset
,以便可以按其原始顺序插入结果。把所有东西放在一起:
protected async void FSL_Sync_btn_Click(object sender, EventArgs e)
{
int total = Int32.MaxValue;
IEnumerable<int> offsets = Enumerable
.Range(0, Int32.MaxValue)
.Select(n => checked(n * 20))
.TakeWhile(offset => offset < Volatile.Read(ref total));
var options = new ParallelOptions() { MaxDegreeOfParallelism = 10 };
var results = new ConcurrentQueue<(int Offset, JObject JResult)>();
await Parallel.ForEachAsync(offsets, options, async (offset, ct) =>
{
string data = await GetFSLData(offset.ToString(), "Products");
JObject Jresult = JObject.Parse(data);
if (offset == 0)
{
Volatile.Write(ref total,
Int32.Parse(Jresult.SelectToken("total").ToString()));
}
results.Enqueue((offset, Jresult));
});
DataTable resTbl = CreateDt();
foreach (var (offset, Jresult) in results.OrderBy(e => e.Offset))
{
resTbl = WriteInDataTable(resTbl, Jresult);
}
}
Volatile.Read
/Volatile.Write
是必需的,因为 total
变量可能会被多个线程并行访问。
为了获得最佳性能,您可能需要根据远程服务器的功能和您的互联网连接调整 MaxDegreeOfParallelism
配置。
注意:此解决方案在内存方面效率不高,因为它要求所有数据同时以两种不同格式存储在内存中。
我正在请求某种产品的数据 API,但问题是我得到的数据是 20 x 20。所以端点看起来像这样:
https://www.someDummyAPI.com/Api/Products?offset=0&count=20
注意:我无法更改计数,它始终是 20。
即来自该端点的数据将包含 20 条记录,从 0 到 20,之后我必须将偏移量增加 20 以获得下一个 20 条记录,依此类推(总共大约 1500 条记录,所以我必须发出大约 700 条请求)。
获取所有数据后,我使用存储过程将其插入 SQL 数据库(这是不同的过程)。
所以我的问题是,我怎样才能加快获取过程,我考虑过 运行 并行任务,但我需要从响应中获得结果。
现在这个过程看起来像这样:
protected async void FSL_Sync_btn_Click(object sender, EventArgs e)
{
int offset = 0;
int total= 0;
bool isFirst = true;
DataTable resTbl = CreateDt();
while (offset < total || offset == 0)
{
try
{
var data = await GetFSLData(offset.ToString(),"Products");
JObject Jresult = JObject.Parse(data);
if (isFirst)
{
Int32.TryParse(Jresult.SelectToken("total").ToString(),out total);
isFirst = false;
}
// Function to chain up data in DataTable
resTbl = WriteInDataTable(resTbl, Jresult);
offset += 20;
}
catch(Exception ex)
{
var msg = ex.Message;
}
}
}
所以我采用的流程是:
- 从 API 获取数据(比方说前 20 条记录)。
- 使用
WriteInDataTable
函数添加两个现有的DataTable
。 - 从
resTbl
Datatable
中将数据插入 SQL 数据库(完全不同的过程,未在此屏幕截图中显示)。
我还没有使用过并行任务(甚至不知道它是否是一个正确的解决方案),所以非常感谢任何帮助。
获取你的第一条记录并在循环前先设置总数:
var data = await GetFSLData(offset.ToString(),"Products");
JObject Jresult = JObject.Parse(data);
Int32.TryParse(Jresult.SelectToken("total").ToString(),out total);
在下一步中,您可以并行化您的任务:
DataTable resTbl = CreateDt();
var downloadTasks = new List<Task>();
while (offset < total)
{
downloadTasks.Add(GetFSLData(offset.ToString(),"Products"));
offset += 20;
}
然后就可以使用Task.WhenAll
获取数据
var httpResults = await Task.WhenAll(downloadTasks);
foreach (var jObjectResult in httpResults.Select(JObject.Parse))
{
resTbl = WriteInDataTable(resTbl, Jresult);
}
只是一些需要注意的事情:您会同时遇到很多请求 api,这可能不是一个好主意。如果您 运行 遇到此问题,则可以在 TPL 数据流库中使用 TransformBlock
和 ActionBlock
。您可以在此处找到更多相关信息:
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
您可以使用 Task.WhenAll
来并行 运行 您的请求。
public async Task<IEnumerable<string>> GetDataInParallel()
{
var tasks = new List<Task<string>>();
while(...)
{
var dataTask = GetFastLaneData(offset.ToString(), "Products"); // does not launch request, only add task to a list
tasks.Add(dataTask);
offset += 20
}
var datas = await Task.WhenAll(tasks); // launch all tasks
return datas;
}
此方法将尝试创建或使用 1000 个线程并对其进行管理,这可能会影响性能,但会比按顺序启动它们快得多。您可以考虑对它们进行批处理以获得 更好的性能 并一次启动大约 100 个任务。
由于代码中的高抽象级别(恕我直言,这很好,但很难在 SO 这样的页面上发现错误),因此很难知道您真正使用和获得的是什么。
所以这里只是一个关于如何将所有请求并行化到 API 以缩短获取时间并将结果一次性写入数据库的草图。也许 API 上有一些配额,您可能必须 运行 这些东西成块,但这可以通过 LINQ 轻松采用。
var httpClient = new HttpClient();
var requests = Enumerable.Range(0, 1500)
.Where(i => i % 20 == 0)
// Create all needed requests
.Select(offset => $"https://www.someDummyAPI.com/Api/Products?offset={offset}&count=20")
.Select(url => new HttpRequestMessage(HttpMethod.Get, url))
// Create tasks to call these requests
.Select(request => httpClient.SendAsync(request));
// Run all of these requests in parallel.
var responses = await Task.WhenAll(requests);
// Create all tasks to get the content out of the requests
var allContentStreams = responses
.Select(response => response.Content.ReadAsStringAsync());
// Retrieve all content bodies as strings
var allRawContents = await Task.WhenAll(allContentStreams);
// Serialize strings into some usable object
var allData = allRawContents
.Select(JsonConvert.DeserializeObject<MyDataDTO>);
// Add all objects to the database context.
foreach (var data in allData)
{
WriteIntoDatabase(data);
}
// Let context persist data into database.
SaveDatabase();
如果您已经升级到 .NET 6 平台,您可以考虑使用 Parallel.ForEachAsync
method to parallelize the GetFSLData
invocations. This method requires an IEnumerable<T>
sequence as source. You can create this sequence using LINQ (the Enumerable.Range
method). To avoid any problems associated with the thread-safety of the DataTable
class, you can store the JObject
results in an intermediate ConcurrentQueue<JObject>
集合,并推迟 DataTable
的创建,直到所有数据都已获取并在本地可用.您可能还需要存储与每个 JObject
关联的 offset
,以便可以按其原始顺序插入结果。把所有东西放在一起:
protected async void FSL_Sync_btn_Click(object sender, EventArgs e)
{
int total = Int32.MaxValue;
IEnumerable<int> offsets = Enumerable
.Range(0, Int32.MaxValue)
.Select(n => checked(n * 20))
.TakeWhile(offset => offset < Volatile.Read(ref total));
var options = new ParallelOptions() { MaxDegreeOfParallelism = 10 };
var results = new ConcurrentQueue<(int Offset, JObject JResult)>();
await Parallel.ForEachAsync(offsets, options, async (offset, ct) =>
{
string data = await GetFSLData(offset.ToString(), "Products");
JObject Jresult = JObject.Parse(data);
if (offset == 0)
{
Volatile.Write(ref total,
Int32.Parse(Jresult.SelectToken("total").ToString()));
}
results.Enqueue((offset, Jresult));
});
DataTable resTbl = CreateDt();
foreach (var (offset, Jresult) in results.OrderBy(e => e.Offset))
{
resTbl = WriteInDataTable(resTbl, Jresult);
}
}
Volatile.Read
/Volatile.Write
是必需的,因为 total
变量可能会被多个线程并行访问。
为了获得最佳性能,您可能需要根据远程服务器的功能和您的互联网连接调整 MaxDegreeOfParallelism
配置。
注意:此解决方案在内存方面效率不高,因为它要求所有数据同时以两种不同格式存储在内存中。