WebCrawler 的 TPL 数据流
TPL Dataflow for WebCrawler
我想创建一个网络爬虫,它将下载位于某个 URL 的页面,搜索一些元素,然后为其创建一个结果,该结果将准备好保存到数据库中。但是我希望这个DB部分可以批量保存
最后一部分是,是什么让整个练习变得有点困难(至少就我目前对 TPL 数据流的理解而言,它有 1 天的历史;))我知道,有 BatchBlock 元素,但场景我看到它,很简单,它是第一步,正在“批处理”应用程序中给出的输入(不是内部管道工作)而且我试图将这个批处理部分放在管道内的某个地方,但我要么被迫将 url 的列表传递给第一步(然后下载 url 阶段将是一个步骤,其他步骤将等到这个完成)或者我可以传递一个 url 到管道,但随后注意到从 1 url 开始批处理,有一个解析元素要保存到 DB :)
这就是我想要实现的目标:
当然重要的是,每次下载 url 与其他“下载 url”操作“独立”。因此,一旦下载了某个页面,它就可以立即转到网络抓取部分。一旦准备就绪,它可以立即进入在 DB 中保存的阶段(因此等到 x 元素的批次到来 - 例如 - 5)然后将其保存到 DB。
当然不用说了,“下载url”和“Webscrape必要数据”转换都是异步操作
也许这不是您可以使用 TPL Dataflow 解决的问题?请指教:)
[更新 - 2020 年 8 月 7 日 13:25]
好吧,昨天我做了一个错误的假设,我 post 只有一件事在管道中,因为签名需要一个字符串。这显然是错误的假设,因为我可以多次调用它:)
我有或多或少的工作示例,但缺少两件事。将其更改为异步以及如何刷新 BatchBlock。因为如果我有大小为 3 的 BatchBlock 并将其发送到管道 8 URLs,我只会从前 6.
收到响应
这个例子的另一个问题是......即使不需要刷新(所以我发送 9 URLs 并且 BatchBlock 是 3)程序仍然无限期地运行。问题出在哪里?
Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var downloadUrl = new TransformBlock<string, string>(url =>
{
Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var parseContent = new TransformBlock<string, string>(content =>
{
Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var saveToDb = new TransformBlock<string[], bool>(results =>
{
Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
PropagateCompletion = true
});
downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());
//last digit in string is treated as url download time (in seconds) and half of it is for processing time.
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds
downloadUrl.Complete();
saveToDb.Completion.Wait();
总结三个问题:
- 如何刷新 BatchBlock
- 为什么这个示例应用 运行 无限期
- 如何使其异步
[更新 2 - 2020 年 8 月 7 日 14:28]
有人建议这是我的问题的解决方案:
但是我已经添加了所有 , new DataflowLinkOptions { PropagateCompletion = true }
并添加了 workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());
但它仍然无法正常工作
我认为这可以满足您的需求...
首先,创建一个供大家使用的客户端:
private static readonly HttpClient _client = new HttpClient(new HttpClientHandler
{
AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
});
然后这是我构建块并将它们链接起来的方式:
const int maxDegreeOfParalleism = 10;
// first in, first out buffer block
var uriInputBlock = new BufferBlock<Uri>();
// transform block will download the data to string
var downloadHttpDataBlock = new TransformBlock<Uri, string>(async uri =>
{
using(var msg = new HttpRequestMessage(HttpMethod.Get, uri))
using(var resp = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead))
{
return await resp.Content.ReadAsStringAsync().ConfigureAwait(false);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });
// this block will take the data and scrape what it wants
var htmlScrapeBlock = new TransformBlock<string, string[]>(data =>
{
var doc = new HtmlAgilityPack.HtmlDocument();
doc.LoadHtml(data);
return doc.DocumentNode.SelectNodes("//a[@href]").
Select(x => x.GetAttributeValue("href", string.Empty)).ToArray();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });
// take in arrays and send them out as single elements
var manyToOneBlock = new TransformManyBlock<string[], string>(x => x);
// output data to a batch block with grouping of 10
var outputDataBlcok = new BatchBlock<string>(10);
// final block to store it somewhere
var databaseBlock = new ActionBlock<string[]>(x =>
{
Console.WriteLine($"Group of {x.Length} items to be processed:");
foreach (var uri in x)
{
Console.WriteLine($"Store this: {uri}");
}
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
uriInputBlock.LinkTo(downloadHttpDataBlock, linkOptions);
downloadHttpDataBlock.LinkTo(htmlScrapeBlock, linkOptions);
htmlScrapeBlock.LinkTo(manyToOneBlock, linkOptions);
manyToOneBlock.LinkTo(outputDataBlcok, linkOptions);
outputDataBlcok.LinkTo(databaseBlock, linkOptions);
uriInputBlock.Post(new Uri("https://whosebug.com"));
uriInputBlock.Post(new Uri("https://google.com"));
uriInputBlock.Post(new Uri("https://yahoo.com"));
uriInputBlock.Post(new Uri("https://example.com"));
// When you want to complete/close down the pipeline, call this
uriInputBlock.Complete();
// you can wait for all data to finish propogating by calling this:
databaseBlock.Completion.Wait();
这只是一个基本概念,显然您可以使这个好得多,但它应该能让您入门。有关许多不同块的更多信息 here。
我建议您看看 Microsoft 的 Reactive Framework(又名 Rx),因为它使这种处理变得超级简单。
如果我可以假设您有 List<string> urls
并且您有以下方法:
Task<string> DownloadUrlAsync(string url)
Task<string> WebscrapeAsync(string content)
Task SaveDataToDBAsync(IList<string> data)
...那么你可以用 Rx 来做到这一点:
int buffer_size = 50;
IObservable<Unit> query =
urls
.ToObservable()
.SelectMany(url => Observable.FromAsync(() => DownloadUrlAsync(url)))
.SelectMany(content => Observable.FromAsync(() => WebscrapeAsync(content)))
.Buffer(buffer_size)
.SelectMany(buffer => Observable.FromAsync(() => SaveDataToDBAsync(buffer)));
IDisposable subscription = query.Subscribe();
该查询使用多线程处理所有异步调用,缓冲内容并保存到数据库。
.Subscribe
方法也有回调来处理生成的值,任何异常,and/or 完成。
您需要 NuGet System.Reactive
并添加 using System.Reactive.Linq;
以获取位。
我想创建一个网络爬虫,它将下载位于某个 URL 的页面,搜索一些元素,然后为其创建一个结果,该结果将准备好保存到数据库中。但是我希望这个DB部分可以批量保存
最后一部分是,是什么让整个练习变得有点困难(至少就我目前对 TPL 数据流的理解而言,它有 1 天的历史;))我知道,有 BatchBlock 元素,但场景我看到它,很简单,它是第一步,正在“批处理”应用程序中给出的输入(不是内部管道工作)而且我试图将这个批处理部分放在管道内的某个地方,但我要么被迫将 url 的列表传递给第一步(然后下载 url 阶段将是一个步骤,其他步骤将等到这个完成)或者我可以传递一个 url 到管道,但随后注意到从 1 url 开始批处理,有一个解析元素要保存到 DB :)
这就是我想要实现的目标:
当然重要的是,每次下载 url 与其他“下载 url”操作“独立”。因此,一旦下载了某个页面,它就可以立即转到网络抓取部分。一旦准备就绪,它可以立即进入在 DB 中保存的阶段(因此等到 x 元素的批次到来 - 例如 - 5)然后将其保存到 DB。
当然不用说了,“下载url”和“Webscrape必要数据”转换都是异步操作
也许这不是您可以使用 TPL Dataflow 解决的问题?请指教:)
[更新 - 2020 年 8 月 7 日 13:25]
好吧,昨天我做了一个错误的假设,我 post 只有一件事在管道中,因为签名需要一个字符串。这显然是错误的假设,因为我可以多次调用它:)
我有或多或少的工作示例,但缺少两件事。将其更改为异步以及如何刷新 BatchBlock。因为如果我有大小为 3 的 BatchBlock 并将其发送到管道 8 URLs,我只会从前 6.
收到响应这个例子的另一个问题是......即使不需要刷新(所以我发送 9 URLs 并且 BatchBlock 是 3)程序仍然无限期地运行。问题出在哪里?
Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var downloadUrl = new TransformBlock<string, string>(url =>
{
Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var parseContent = new TransformBlock<string, string>(content =>
{
Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var saveToDb = new TransformBlock<string[], bool>(results =>
{
Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
PropagateCompletion = true
});
downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());
//last digit in string is treated as url download time (in seconds) and half of it is for processing time.
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds
downloadUrl.Complete();
saveToDb.Completion.Wait();
总结三个问题:
- 如何刷新 BatchBlock
- 为什么这个示例应用 运行 无限期
- 如何使其异步
[更新 2 - 2020 年 8 月 7 日 14:28]
有人建议这是我的问题的解决方案:
但是我已经添加了所有 , new DataflowLinkOptions { PropagateCompletion = true }
并添加了 workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());
但它仍然无法正常工作
我认为这可以满足您的需求...
首先,创建一个供大家使用的客户端:
private static readonly HttpClient _client = new HttpClient(new HttpClientHandler
{
AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
});
然后这是我构建块并将它们链接起来的方式:
const int maxDegreeOfParalleism = 10;
// first in, first out buffer block
var uriInputBlock = new BufferBlock<Uri>();
// transform block will download the data to string
var downloadHttpDataBlock = new TransformBlock<Uri, string>(async uri =>
{
using(var msg = new HttpRequestMessage(HttpMethod.Get, uri))
using(var resp = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead))
{
return await resp.Content.ReadAsStringAsync().ConfigureAwait(false);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });
// this block will take the data and scrape what it wants
var htmlScrapeBlock = new TransformBlock<string, string[]>(data =>
{
var doc = new HtmlAgilityPack.HtmlDocument();
doc.LoadHtml(data);
return doc.DocumentNode.SelectNodes("//a[@href]").
Select(x => x.GetAttributeValue("href", string.Empty)).ToArray();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });
// take in arrays and send them out as single elements
var manyToOneBlock = new TransformManyBlock<string[], string>(x => x);
// output data to a batch block with grouping of 10
var outputDataBlcok = new BatchBlock<string>(10);
// final block to store it somewhere
var databaseBlock = new ActionBlock<string[]>(x =>
{
Console.WriteLine($"Group of {x.Length} items to be processed:");
foreach (var uri in x)
{
Console.WriteLine($"Store this: {uri}");
}
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
uriInputBlock.LinkTo(downloadHttpDataBlock, linkOptions);
downloadHttpDataBlock.LinkTo(htmlScrapeBlock, linkOptions);
htmlScrapeBlock.LinkTo(manyToOneBlock, linkOptions);
manyToOneBlock.LinkTo(outputDataBlcok, linkOptions);
outputDataBlcok.LinkTo(databaseBlock, linkOptions);
uriInputBlock.Post(new Uri("https://whosebug.com"));
uriInputBlock.Post(new Uri("https://google.com"));
uriInputBlock.Post(new Uri("https://yahoo.com"));
uriInputBlock.Post(new Uri("https://example.com"));
// When you want to complete/close down the pipeline, call this
uriInputBlock.Complete();
// you can wait for all data to finish propogating by calling this:
databaseBlock.Completion.Wait();
这只是一个基本概念,显然您可以使这个好得多,但它应该能让您入门。有关许多不同块的更多信息 here。
我建议您看看 Microsoft 的 Reactive Framework(又名 Rx),因为它使这种处理变得超级简单。
如果我可以假设您有 List<string> urls
并且您有以下方法:
Task<string> DownloadUrlAsync(string url)
Task<string> WebscrapeAsync(string content)
Task SaveDataToDBAsync(IList<string> data)
...那么你可以用 Rx 来做到这一点:
int buffer_size = 50;
IObservable<Unit> query =
urls
.ToObservable()
.SelectMany(url => Observable.FromAsync(() => DownloadUrlAsync(url)))
.SelectMany(content => Observable.FromAsync(() => WebscrapeAsync(content)))
.Buffer(buffer_size)
.SelectMany(buffer => Observable.FromAsync(() => SaveDataToDBAsync(buffer)));
IDisposable subscription = query.Subscribe();
该查询使用多线程处理所有异步调用,缓冲内容并保存到数据库。
.Subscribe
方法也有回调来处理生成的值,任何异常,and/or 完成。
您需要 NuGet System.Reactive
并添加 using System.Reactive.Linq;
以获取位。