Task.Result 由于 HttpClient 请求缓慢而在 Parallel.ForEach 内阻塞
Task.Result blocking inside Parallel.ForEach due to slow HttpClient request
我理解将异步 lambda 与 Parallel.ForEach
一起使用的含义,这就是我在这里不使用它的原因。然后,这迫使我对发出 Http 请求的每个任务使用 .Result
。然而,运行 这个通过性能分析器的简单抓取器显示 .Result
的已用独占时间百分比约为 98%,这显然是由于调用的阻塞性质。
我的问题是:是否有可能对其进行优化以使其仍然是异步的?我不确定在这种情况下是否会有帮助,因为检索 HTML/XML.
可能只需要这么长时间
我是 运行 具有 8 个逻辑内核的 4 核处理器(因此 MaxDegreesOfParallelism = 8
。现在我预计需要大约 2.5 小时来下载和解析 ~51,000 HTML/XML 页的简单财务数据。
我倾向于使用 XmlReader 而不是 Linq2XML 来加速解析,但瓶颈似乎在 .Result
调用处。
虽然这里应该无关紧要,但 SEC 将抓取限制为 10 requests/sec。
public class SECScraper
{
public event EventHandler<ProgressChangedEventArgs> ProgressChangedEvent;
public SECScraper(HttpClient client, FinanceContext financeContext)
{
_client = client;
_financeContext = financeContext;
}
public void Download()
{
_numDownloaded = 0;
_interval = _financeContext.Companies.Count() / 100;
Parallel.ForEach(_financeContext.Companies, new ParallelOptions {MaxDegreeOfParallelism = 8},
company =>
{
RetrieveSECData(company.CIK);
});
}
protected virtual void OnProgressChanged(ProgressChangedEventArgs e)
{
ProgressChangedEvent?.Invoke(this, e);
}
private void RetrieveSECData(int cik)
{
// move this url elsewhere
var url = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK=" + cik +
"&type=10-q&dateb=&owner=include&count=100";
var srBody = ReadHTML(url).Result; // consider moving this to srPage
var srPage = new SearchResultsPage(srBody);
var reportLinks = srPage.GetAllReportLinks();
foreach (var link in reportLinks)
{
url = SEC_HOSTNAME + link;
var fdBody = ReadHTML(url).Result;
var fdPage = new FilingDetailsPage(fdBody);
var xbrlLink = fdPage.GetInstanceDocumentLink();
var xbrlBody = ReadHTML(SEC_HOSTNAME + xbrlLink).Result;
var xbrlDoc = new XBRLDocument(xbrlBody);
var epsData = xbrlDoc.GetAllEPSData();
//foreach (var eps in epsData)
// Console.WriteLine($"{eps.StartDate} to {eps.EndDate} -- {eps.EPS}");
}
IncrementNumDownloadedAndNotify();
}
private async Task<string> ReadHTML(string url)
{
using var response = await _client.GetAsync(url);
return await response.Content.ReadAsStringAsync();
}
}
该任务不受 CPU 约束,而是受网络约束,因此无需使用多线程。
在一个线程上进行多个异步调用。 只是不要等待他们。 将任务列在清单上。当你在那里达到一定数量时(比如你想一次去 10 个),开始等待第一个完成(查看 'task, WhenAny' 了解更多信息)。
然后放更多 :-) 然后您可以使用其他代码通过 #/second 控制任务的大小。
is there any possibility of optimizing this for it to still be async?
是的。我不确定你为什么首先使用 Parallel
;对于此类问题,这似乎是错误的解决方案。您需要跨项目集合执行异步工作,因此更适合异步并发;这是使用 Task.WhenAll
:
完成的
public class SECScraper
{
public async Task DownloadAsync()
{
_numDownloaded = 0;
_interval = _financeContext.Companies.Count() / 100;
var tasks = _financeContext.Companies.Select(company => RetrieveSECDataAsync(company.CIK)).ToList();
await Task.WhenAll(tasks);
}
private async Task RetrieveSECDataAsync(int cik)
{
var url = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK=" + cik +
"&type=10-q&dateb=&owner=include&count=100";
var srBody = await ReadHTMLAsync(url);
var srPage = new SearchResultsPage(srBody);
var reportLinks = srPage.GetAllReportLinks();
foreach (var link in reportLinks)
{
url = SEC_HOSTNAME + link;
var fdBody = await ReadHTMLAsync(url);
var fdPage = new FilingDetailsPage(fdBody);
var xbrlLink = fdPage.GetInstanceDocumentLink();
var xbrlBody = await ReadHTMLAsync(SEC_HOSTNAME + xbrlLink);
var xbrlDoc = new XBRLDocument(xbrlBody);
var epsData = xbrlDoc.GetAllEPSData();
}
IncrementNumDownloadedAndNotify();
}
private async Task<string> ReadHTMLAsync(string url)
{
using var response = await _client.GetAsync(url);
return await response.Content.ReadAsStringAsync();
}
}
此外,我建议您使用 IProgress<T>
报告进度。
我理解将异步 lambda 与 Parallel.ForEach
一起使用的含义,这就是我在这里不使用它的原因。然后,这迫使我对发出 Http 请求的每个任务使用 .Result
。然而,运行 这个通过性能分析器的简单抓取器显示 .Result
的已用独占时间百分比约为 98%,这显然是由于调用的阻塞性质。
我的问题是:是否有可能对其进行优化以使其仍然是异步的?我不确定在这种情况下是否会有帮助,因为检索 HTML/XML.
可能只需要这么长时间我是 运行 具有 8 个逻辑内核的 4 核处理器(因此 MaxDegreesOfParallelism = 8
。现在我预计需要大约 2.5 小时来下载和解析 ~51,000 HTML/XML 页的简单财务数据。
我倾向于使用 XmlReader 而不是 Linq2XML 来加速解析,但瓶颈似乎在 .Result
调用处。
虽然这里应该无关紧要,但 SEC 将抓取限制为 10 requests/sec。
public class SECScraper
{
public event EventHandler<ProgressChangedEventArgs> ProgressChangedEvent;
public SECScraper(HttpClient client, FinanceContext financeContext)
{
_client = client;
_financeContext = financeContext;
}
public void Download()
{
_numDownloaded = 0;
_interval = _financeContext.Companies.Count() / 100;
Parallel.ForEach(_financeContext.Companies, new ParallelOptions {MaxDegreeOfParallelism = 8},
company =>
{
RetrieveSECData(company.CIK);
});
}
protected virtual void OnProgressChanged(ProgressChangedEventArgs e)
{
ProgressChangedEvent?.Invoke(this, e);
}
private void RetrieveSECData(int cik)
{
// move this url elsewhere
var url = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK=" + cik +
"&type=10-q&dateb=&owner=include&count=100";
var srBody = ReadHTML(url).Result; // consider moving this to srPage
var srPage = new SearchResultsPage(srBody);
var reportLinks = srPage.GetAllReportLinks();
foreach (var link in reportLinks)
{
url = SEC_HOSTNAME + link;
var fdBody = ReadHTML(url).Result;
var fdPage = new FilingDetailsPage(fdBody);
var xbrlLink = fdPage.GetInstanceDocumentLink();
var xbrlBody = ReadHTML(SEC_HOSTNAME + xbrlLink).Result;
var xbrlDoc = new XBRLDocument(xbrlBody);
var epsData = xbrlDoc.GetAllEPSData();
//foreach (var eps in epsData)
// Console.WriteLine($"{eps.StartDate} to {eps.EndDate} -- {eps.EPS}");
}
IncrementNumDownloadedAndNotify();
}
private async Task<string> ReadHTML(string url)
{
using var response = await _client.GetAsync(url);
return await response.Content.ReadAsStringAsync();
}
}
该任务不受 CPU 约束,而是受网络约束,因此无需使用多线程。
在一个线程上进行多个异步调用。 只是不要等待他们。 将任务列在清单上。当你在那里达到一定数量时(比如你想一次去 10 个),开始等待第一个完成(查看 'task, WhenAny' 了解更多信息)。
然后放更多 :-) 然后您可以使用其他代码通过 #/second 控制任务的大小。
is there any possibility of optimizing this for it to still be async?
是的。我不确定你为什么首先使用 Parallel
;对于此类问题,这似乎是错误的解决方案。您需要跨项目集合执行异步工作,因此更适合异步并发;这是使用 Task.WhenAll
:
public class SECScraper
{
public async Task DownloadAsync()
{
_numDownloaded = 0;
_interval = _financeContext.Companies.Count() / 100;
var tasks = _financeContext.Companies.Select(company => RetrieveSECDataAsync(company.CIK)).ToList();
await Task.WhenAll(tasks);
}
private async Task RetrieveSECDataAsync(int cik)
{
var url = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK=" + cik +
"&type=10-q&dateb=&owner=include&count=100";
var srBody = await ReadHTMLAsync(url);
var srPage = new SearchResultsPage(srBody);
var reportLinks = srPage.GetAllReportLinks();
foreach (var link in reportLinks)
{
url = SEC_HOSTNAME + link;
var fdBody = await ReadHTMLAsync(url);
var fdPage = new FilingDetailsPage(fdBody);
var xbrlLink = fdPage.GetInstanceDocumentLink();
var xbrlBody = await ReadHTMLAsync(SEC_HOSTNAME + xbrlLink);
var xbrlDoc = new XBRLDocument(xbrlBody);
var epsData = xbrlDoc.GetAllEPSData();
}
IncrementNumDownloadedAndNotify();
}
private async Task<string> ReadHTMLAsync(string url)
{
using var response = await _client.GetAsync(url);
return await response.Content.ReadAsStringAsync();
}
}
此外,我建议您使用 IProgress<T>
报告进度。