在递归方法中如何知道我的所有线程何时完成执行?
How to know when all my threads have finished executing when in recursive method?
我一直在从事网络抓取项目。
我有两个问题,一个是以百分比形式显示处理的 url 数量,但一个更大的问题是我无法弄清楚我如何知道我创建的所有线程何时完全完成。
注意:我知道并行 foreach 一旦完成就继续进行,但这是在 递归 方法中。
我的代码如下:
public async Task Scrape(string url)
{
var page = string.Empty;
try
{
page = await _service.Get(url);
if (page != string.Empty)
{
if (regex.IsMatch(page))
{
Parallel.For(0, regex.Matches(page).Count,
index =>
{
try
{
if (regex.Matches(page)[index].Groups[1].Value.StartsWith("/"))
{
var match = regex.Matches(page)[index].Groups[1].Value.ToLower();
if (!links.Contains(BaseUrl + match) && !Visitedlinks.Contains(BaseUrl + match))
{
Uri ValidUri = WebPageValidator.GetUrl(match);
if (ValidUri != null && HostUrls.Contains(ValidUri.Host))
links.Enqueue(match.Replace(".html", ""));
else
links.Enqueue(BaseUrl + match.Replace(".html", ""));
}
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details."); ;
}
});
WebPageInternalHandler.SavePage(page, url);
var context = CustomSynchronizationContext.GetSynchronizationContext();
Parallel.ForEach(links, new ParallelOptions { MaxDegreeOfParallelism = 25 },
webpage =>
{
try
{
if (WebPageValidator.ValidUrl(webpage))
{
string linkToProcess = webpage;
if (links.TryDequeue(out linkToProcess) && !Visitedlinks.Contains(linkToProcess))
{
ShowPercentProgress();
Thread.Sleep(15);
Visitedlinks.Enqueue(linkToProcess);
Task d = Scrape(linkToProcess);
Console.Clear();
}
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details.");
}
});
Console.WriteLine("parallel finished");
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details.");
}
}
注意 Scrape
被多次调用(递归)
像这样调用方法:
public Task ExecuteScrape()
{
var context = CustomSynchronizationContext.GetSynchronizationContext();
Scrape(BaseUrl).ContinueWith(x => {
Visitedlinks.Enqueue(BaseUrl);
}, context).Wait();
return null;
}
它又被这样调用:
static void Main(string[] args)
{
RunScrapper();
Console.ReadLine();
}
public static void RunScrapper()
{
try
{
_scrapper.ExecuteScrape();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
我的结果:
我该如何解决这个问题?
你不能将所有任务 Task d
添加到某种类型的并发集合中,你通过所有递归调用(通过方法参数)线程化,然后简单地调用 Task.WhenAll(tasks).Wait()
吗?
您需要一个中间方法(使其更清晰)来启动基础 Scrape
调用并传入空任务集合。当碱基调用 returns 时,您已完成所有任务,只需等待它们结束即可。
public async Task Scrape (
string url) {
var tasks = new ConcurrentQueue<Task>();
//call your implementation but
//change it so that you add
//all launched tasks d to tasks
Scrape(url, tasks);
//1st option: Wait().
//This will block caller
//until all tasks finish
Task.WhenAll(tasks).Wait();
//or 2nd option: await
//this won't block and will return to caller.
//Once all tasks are finished method
//will resume in WriteLine
await Task.WhenAll(tasks);
Console.WriteLine("Finished!"); }
简单规则:如果您想知道某事何时结束,第一步是跟踪。在您当前的实现中,您实际上是在触发并忘记所有已启动的任务...
(我回答有关网页抓取的问题是否合乎道德?)
不要递归调用Scrape
。将要抓取的 url 列表放入 ConcurrentQueue
并开始处理该队列。由于抓取页面的过程 returns 多 url 次,只需将它们添加到同一个队列中即可。
我也不会只使用字符串。我建议创建一个 class like
public class UrlToScrape //because naming things is hard
{
public string Url { get; set; }
public int Depth { get; set; }
}
无论您如何执行它,它都是递归的,因此您必须以某种方式跟踪您的层级深度。网站可能会故意生成 URL 使您陷入无限递归。 (如果他们这样做,那么他们不希望您抓取他们的网站。有人希望人们抓取他们的网站吗?)
当您的队列为空时,并不意味着您已经完成。队列可能是空的,但是抓取最后一个 url 出队的过程仍然可以将更多项目添加回该队列,因此您需要一种方法来解决这个问题。
您可以使用线程安全计数器(int
使用 Interlocked.Increment/Decrement
),当您开始处理 url 时递增并在完成时递减。当队列为空时,您就完成了 并且 进程中 url 的计数为零。
这是一个非常粗略的模型来说明这个概念,而不是我所说的精细解决方案。比如你还需要考虑异常处理,我也不知道结果去哪里等等
public class UrlScraper
{
private readonly ConcurrentQueue<UrlToScrape> _queue = new ConcurrentQueue<UrlToScrape>();
private int _inProcessUrlCounter;
private readonly List<string> _processedUrls = new List<string>();
public UrlScraper(IEnumerable<string> urls)
{
foreach (var url in urls)
{
_queue.Enqueue(new UrlToScrape {Url = url, Depth = 1});
}
}
public void ScrapeUrls()
{
while (_queue.TryDequeue(out var dequeuedUrl) || _inProcessUrlCounter > 0)
{
if (dequeuedUrl != null)
{
// Make sure you don't go more levels deep than you want to.
if (dequeuedUrl.Depth > 5) continue;
if (_processedUrls.Contains(dequeuedUrl.Url)) continue;
_processedUrls.Add(dequeuedUrl.Url);
Interlocked.Increment(ref _inProcessUrlCounter);
var url = dequeuedUrl;
Task.Run(() => ProcessUrl(url));
}
}
}
private void ProcessUrl(UrlToScrape url)
{
try
{
// As the process discovers more urls to scrape,
// pretend that this is one of those new urls.
var someNewUrl = "http://discovered";
_queue.Enqueue(new UrlToScrape { Url = someNewUrl, Depth = url.Depth + 1 });
}
catch (Exception ex)
{
// whatever you want to do with this
}
finally
{
Interlocked.Decrement(ref _inProcessUrlCounter);
}
}
}
如果我真的这样做,ProcessUrl
方法将是它自己的 class,并且需要 HTML,而不是 URL。在这种形式下很难进行单元测试。如果它在一个单独的 class 中,那么您可以传入 HTML,验证它是否在某处输出结果,并且它调用一个方法来将它找到的新 URL 加入队列。
将队列作为数据库维护也是一个不错的主意table。否则,如果您正在处理一堆 url 并且必须停止,那么您将重新开始。
我一直在从事网络抓取项目。
我有两个问题,一个是以百分比形式显示处理的 url 数量,但一个更大的问题是我无法弄清楚我如何知道我创建的所有线程何时完全完成。
注意:我知道并行 foreach 一旦完成就继续进行,但这是在 递归 方法中。
我的代码如下:
public async Task Scrape(string url)
{
var page = string.Empty;
try
{
page = await _service.Get(url);
if (page != string.Empty)
{
if (regex.IsMatch(page))
{
Parallel.For(0, regex.Matches(page).Count,
index =>
{
try
{
if (regex.Matches(page)[index].Groups[1].Value.StartsWith("/"))
{
var match = regex.Matches(page)[index].Groups[1].Value.ToLower();
if (!links.Contains(BaseUrl + match) && !Visitedlinks.Contains(BaseUrl + match))
{
Uri ValidUri = WebPageValidator.GetUrl(match);
if (ValidUri != null && HostUrls.Contains(ValidUri.Host))
links.Enqueue(match.Replace(".html", ""));
else
links.Enqueue(BaseUrl + match.Replace(".html", ""));
}
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details."); ;
}
});
WebPageInternalHandler.SavePage(page, url);
var context = CustomSynchronizationContext.GetSynchronizationContext();
Parallel.ForEach(links, new ParallelOptions { MaxDegreeOfParallelism = 25 },
webpage =>
{
try
{
if (WebPageValidator.ValidUrl(webpage))
{
string linkToProcess = webpage;
if (links.TryDequeue(out linkToProcess) && !Visitedlinks.Contains(linkToProcess))
{
ShowPercentProgress();
Thread.Sleep(15);
Visitedlinks.Enqueue(linkToProcess);
Task d = Scrape(linkToProcess);
Console.Clear();
}
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details.");
}
});
Console.WriteLine("parallel finished");
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details.");
}
}
注意 Scrape
被多次调用(递归)
像这样调用方法:
public Task ExecuteScrape()
{
var context = CustomSynchronizationContext.GetSynchronizationContext();
Scrape(BaseUrl).ContinueWith(x => {
Visitedlinks.Enqueue(BaseUrl);
}, context).Wait();
return null;
}
它又被这样调用:
static void Main(string[] args)
{
RunScrapper();
Console.ReadLine();
}
public static void RunScrapper()
{
try
{
_scrapper.ExecuteScrape();
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
我的结果:
我该如何解决这个问题?
你不能将所有任务 Task d
添加到某种类型的并发集合中,你通过所有递归调用(通过方法参数)线程化,然后简单地调用 Task.WhenAll(tasks).Wait()
吗?
您需要一个中间方法(使其更清晰)来启动基础 Scrape
调用并传入空任务集合。当碱基调用 returns 时,您已完成所有任务,只需等待它们结束即可。
public async Task Scrape (
string url) {
var tasks = new ConcurrentQueue<Task>();
//call your implementation but
//change it so that you add
//all launched tasks d to tasks
Scrape(url, tasks);
//1st option: Wait().
//This will block caller
//until all tasks finish
Task.WhenAll(tasks).Wait();
//or 2nd option: await
//this won't block and will return to caller.
//Once all tasks are finished method
//will resume in WriteLine
await Task.WhenAll(tasks);
Console.WriteLine("Finished!"); }
简单规则:如果您想知道某事何时结束,第一步是跟踪。在您当前的实现中,您实际上是在触发并忘记所有已启动的任务...
(我回答有关网页抓取的问题是否合乎道德?)
不要递归调用Scrape
。将要抓取的 url 列表放入 ConcurrentQueue
并开始处理该队列。由于抓取页面的过程 returns 多 url 次,只需将它们添加到同一个队列中即可。
我也不会只使用字符串。我建议创建一个 class like
public class UrlToScrape //because naming things is hard
{
public string Url { get; set; }
public int Depth { get; set; }
}
无论您如何执行它,它都是递归的,因此您必须以某种方式跟踪您的层级深度。网站可能会故意生成 URL 使您陷入无限递归。 (如果他们这样做,那么他们不希望您抓取他们的网站。有人希望人们抓取他们的网站吗?)
当您的队列为空时,并不意味着您已经完成。队列可能是空的,但是抓取最后一个 url 出队的过程仍然可以将更多项目添加回该队列,因此您需要一种方法来解决这个问题。
您可以使用线程安全计数器(int
使用 Interlocked.Increment/Decrement
),当您开始处理 url 时递增并在完成时递减。当队列为空时,您就完成了 并且 进程中 url 的计数为零。
这是一个非常粗略的模型来说明这个概念,而不是我所说的精细解决方案。比如你还需要考虑异常处理,我也不知道结果去哪里等等
public class UrlScraper
{
private readonly ConcurrentQueue<UrlToScrape> _queue = new ConcurrentQueue<UrlToScrape>();
private int _inProcessUrlCounter;
private readonly List<string> _processedUrls = new List<string>();
public UrlScraper(IEnumerable<string> urls)
{
foreach (var url in urls)
{
_queue.Enqueue(new UrlToScrape {Url = url, Depth = 1});
}
}
public void ScrapeUrls()
{
while (_queue.TryDequeue(out var dequeuedUrl) || _inProcessUrlCounter > 0)
{
if (dequeuedUrl != null)
{
// Make sure you don't go more levels deep than you want to.
if (dequeuedUrl.Depth > 5) continue;
if (_processedUrls.Contains(dequeuedUrl.Url)) continue;
_processedUrls.Add(dequeuedUrl.Url);
Interlocked.Increment(ref _inProcessUrlCounter);
var url = dequeuedUrl;
Task.Run(() => ProcessUrl(url));
}
}
}
private void ProcessUrl(UrlToScrape url)
{
try
{
// As the process discovers more urls to scrape,
// pretend that this is one of those new urls.
var someNewUrl = "http://discovered";
_queue.Enqueue(new UrlToScrape { Url = someNewUrl, Depth = url.Depth + 1 });
}
catch (Exception ex)
{
// whatever you want to do with this
}
finally
{
Interlocked.Decrement(ref _inProcessUrlCounter);
}
}
}
如果我真的这样做,ProcessUrl
方法将是它自己的 class,并且需要 HTML,而不是 URL。在这种形式下很难进行单元测试。如果它在一个单独的 class 中,那么您可以传入 HTML,验证它是否在某处输出结果,并且它调用一个方法来将它找到的新 URL 加入队列。
将队列作为数据库维护也是一个不错的主意table。否则,如果您正在处理一堆 url 并且必须停止,那么您将重新开始。