并行请求抓取网站的多个页面
Parallel request to scrape multiple pages of a website
我想抓取一个包含大量包含有趣数据的页面的网站,但由于源非常大,我想使用多线程并限制过载。
我使用 Parallel.ForEach
来启动 10 个任务的每个块,然后在主 for
循环中等待,直到活动线程数开始下降到阈值以下。为此,我使用了一个活动线程计数器,当使用 WebClient
启动新线程时我会递增,当 WebClient
的 DownloadStringCompleted
事件被触发时递减。
最初的问题是如何使用 DownloadStringTaskAsync
而不是 DownloadString
并等待 Parallel.ForEach
中启动的每个线程完成。这已通过解决方法解决:
主 foor 循环中的一个计数器 (activeThreads
) 和一个 Thread.Sleep
。
使用 await DownloadStringTaskAsync
而不是 DownloadString
是否应该通过在等待 DownloadString 数据到达时释放线程来提高速度?
回到最初的问题,有没有一种方法可以使用 TPL 更优雅地做到这一点,而无需涉及计数器的解决方法?
private static volatile int activeThreads = 0;
public static void RecordData()
{
var nbThreads = 10;
var source = db.ListOfUrls; // Thousands urls
var iterations = source.Length / groupSize;
for (int i = 0; i < iterations; i++)
{
var subList = source.Skip(groupSize* i).Take(groupSize);
Parallel.ForEach(subList, (item) => RecordUri(item));
//I want to wait here until process further data to avoid overload
while (activeThreads > 30) Thread.Sleep(100);
}
}
private static async Task RecordUri(Uri uri)
{
using (WebClient wc = new WebClient())
{
Interlocked.Increment(ref activeThreads);
wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
var jsonData = "";
RootObject root;
jsonData = await wc.DownloadStringTaskAsync(uri);
var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
RecordData(root)
}
}
Parallel.ForEach
将创建 ProcessorCount 任务来执行源 Enumerable 中每个项目的函数。它会注意没有太多的任务,并等待所有项目和任务被执行。
Task.WhenAll
只等待给定的任务而不执行它们。以适当的方式执行它们,而不是一次执行多个。
但是你的代码有问题。函数 RecordUri
将 return 必须等待的任务,否则 ForEach 只会创建越来越多的任务,因为该函数永远不知道当前任务何时完成。同样有问题的是,您在任务中创建了一个任务,而第一个任务什么都不做,然后等待第一个任务。
您可能还想看一下 Parallel.ForEach
的重载
https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx
编辑
Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?
没有。当任务正在等待外部资源时,它会进入暂停状态(Windows api 未使用某些 old/dirty 迭代等待)。所以没有太大区别。
不同之处在于编译器在编译异步代码时会产生的开销。 DownloadStringTaskAsync
将创建一个包含长操作的任务。如果你使用等待它,你将自己附加到该任务(通过 ContinueWith)。因此,您只需创建一个任务来等待另一个任务。这就是我在上文中所说的开销。
我的方法是:在 Parallel.ForEach 中使用 synchronous method。 Threadding 将由 PLinq 完成,您可以继续。
记得"KISS"
如果您想要一个优雅的解决方案,您应该使用 Microsoft 的 Reactive Framework。非常简单:
var source = db.ListOfUrls; // Thousands urls
var query =
from uri in source.ToObservable()
from jsonData in Observable.Using(
() => new WebClient(),
wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };
IDisposable subscription =
query.Subscribe(x =>
{
/* Do something with x.uri && x.json */
});
这就是全部代码。它是很好的多线程,而且它一直在控制之下。
只需 NuGet "System.Reactive" 即可获取位。
我想抓取一个包含大量包含有趣数据的页面的网站,但由于源非常大,我想使用多线程并限制过载。
我使用 Parallel.ForEach
来启动 10 个任务的每个块,然后在主 for
循环中等待,直到活动线程数开始下降到阈值以下。为此,我使用了一个活动线程计数器,当使用 WebClient
启动新线程时我会递增,当 WebClient
的 DownloadStringCompleted
事件被触发时递减。
最初的问题是如何使用 DownloadStringTaskAsync
而不是 DownloadString
并等待 Parallel.ForEach
中启动的每个线程完成。这已通过解决方法解决:
主 foor 循环中的一个计数器 (activeThreads
) 和一个 Thread.Sleep
。
使用 await DownloadStringTaskAsync
而不是 DownloadString
是否应该通过在等待 DownloadString 数据到达时释放线程来提高速度?
回到最初的问题,有没有一种方法可以使用 TPL 更优雅地做到这一点,而无需涉及计数器的解决方法?
private static volatile int activeThreads = 0;
public static void RecordData()
{
var nbThreads = 10;
var source = db.ListOfUrls; // Thousands urls
var iterations = source.Length / groupSize;
for (int i = 0; i < iterations; i++)
{
var subList = source.Skip(groupSize* i).Take(groupSize);
Parallel.ForEach(subList, (item) => RecordUri(item));
//I want to wait here until process further data to avoid overload
while (activeThreads > 30) Thread.Sleep(100);
}
}
private static async Task RecordUri(Uri uri)
{
using (WebClient wc = new WebClient())
{
Interlocked.Increment(ref activeThreads);
wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
var jsonData = "";
RootObject root;
jsonData = await wc.DownloadStringTaskAsync(uri);
var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
RecordData(root)
}
}
Parallel.ForEach
将创建 ProcessorCount 任务来执行源 Enumerable 中每个项目的函数。它会注意没有太多的任务,并等待所有项目和任务被执行。
Task.WhenAll
只等待给定的任务而不执行它们。以适当的方式执行它们,而不是一次执行多个。
但是你的代码有问题。函数 RecordUri
将 return 必须等待的任务,否则 ForEach 只会创建越来越多的任务,因为该函数永远不知道当前任务何时完成。同样有问题的是,您在任务中创建了一个任务,而第一个任务什么都不做,然后等待第一个任务。
您可能还想看一下 Parallel.ForEach
的重载
https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx
编辑
Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?
没有。当任务正在等待外部资源时,它会进入暂停状态(Windows api 未使用某些 old/dirty 迭代等待)。所以没有太大区别。
不同之处在于编译器在编译异步代码时会产生的开销。 DownloadStringTaskAsync
将创建一个包含长操作的任务。如果你使用等待它,你将自己附加到该任务(通过 ContinueWith)。因此,您只需创建一个任务来等待另一个任务。这就是我在上文中所说的开销。
我的方法是:在 Parallel.ForEach 中使用 synchronous method。 Threadding 将由 PLinq 完成,您可以继续。
记得"KISS"
如果您想要一个优雅的解决方案,您应该使用 Microsoft 的 Reactive Framework。非常简单:
var source = db.ListOfUrls; // Thousands urls
var query =
from uri in source.ToObservable()
from jsonData in Observable.Using(
() => new WebClient(),
wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };
IDisposable subscription =
query.Subscribe(x =>
{
/* Do something with x.uri && x.json */
});
这就是全部代码。它是很好的多线程,而且它一直在控制之下。
只需 NuGet "System.Reactive" 即可获取位。