并行请求抓取网站的多个页面

Parallel request to scrape multiple pages of a website

我想抓取一个包含大量包含有趣数据的页面的网站,但由于源非常大,我想使用多线程并限制过载。 我使用 Parallel.ForEach 来启动 10 个任务的每个块,然后在主 for 循环中等待,直到活动线程数开始下降到阈值以下。为此,我使用了一个活动线程计数器,当使用 WebClient 启动新线程时我会递增,当 WebClientDownloadStringCompleted 事件被触发时递减。

最初的问题是如何使用 DownloadStringTaskAsync 而不是 DownloadString 并等待 Parallel.ForEach 中启动的每个线程完成。这已通过解决方法解决: 主 foor 循环中的一个计数器 (activeThreads) 和一个 Thread.Sleep

使用 await DownloadStringTaskAsync 而不是 DownloadString 是否应该通过在等待 DownloadString 数据到达时释放线程来提高速度?

回到最初的问题,有没有一种方法可以使用 TPL 更优雅地做到这一点,而无需涉及计数器的解决方法?

private static volatile int activeThreads = 0;

public static void RecordData()
{
  var nbThreads = 10;
  var source = db.ListOfUrls; // Thousands urls
  var iterations = source.Length / groupSize; 
  for (int i = 0; i < iterations; i++)
  {
    var subList = source.Skip(groupSize* i).Take(groupSize);
    Parallel.ForEach(subList, (item) => RecordUri(item)); 
    //I want to wait here until process further data to avoid overload
    while (activeThreads > 30) Thread.Sleep(100);
  }
}

private static async Task RecordUri(Uri uri)
{
   using (WebClient wc = new WebClient())
   {
      Interlocked.Increment(ref activeThreads);
      wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
      var jsonData = "";
      RootObject root;
      jsonData = await wc.DownloadStringTaskAsync(uri);
      var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
      RecordData(root)
    }
}
Parallel.ForEach

将创建 ProcessorCount 任务来执行源 Enumerable 中每个项目的函数。它会注意没有太多的任务,并等待所有项目和任务被执行。

Task.WhenAll

只等待给定的任务而不执行它们。以适当的方式执行它们,而不是一次执行多个。

但是你的代码有问题。函数 RecordUri 将 return 必须等待的任务,否则 ForEach 只会创建越来越多的任务,因为该函数永远不知道当前任务何时完成。同样有问题的是,您在任务中创建了一个任务,而第一个任务什么都不做,然后等待第一个任务。

您可能还想看一下 Parallel.ForEach 的重载 https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx

编辑

Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?

没有。当任务正在等待外部资源时,它会进入暂停状态(Windows api 未使用某些 old/dirty 迭代等待)。所以没有太大区别。 不同之处在于编译器在编译异步代码时会产生的开销。 DownloadStringTaskAsync 将创建一个包含长操作的任务。如果你使用等待它,你将自己附加到该任务(通过 ContinueWith)。因此,您只需创建一个任务来等待另一个任务。这就是我在上文中所说的开销。

我的方法是:在 Parallel.ForEach 中使用 synchronous method。 Threadding 将由 PLinq 完成,您可以继续。

记得"KISS"

如果您想要一个优雅的解决方案,您应该使用 Microsoft 的 Reactive Framework。非常简单:

var source = db.ListOfUrls; // Thousands urls

var query =
    from uri in source.ToObservable()
    from jsonData in Observable.Using(
        () => new WebClient(),
        wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
    select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };

IDisposable subscription =
    query.Subscribe(x =>
    {
        /* Do something with x.uri && x.json */
    });

这就是全部代码。它是很好的多线程,而且它一直在控制之下。

只需 NuGet "System.Reactive" 即可获取位。