WebCrawler 的 TPL 数据流

TPL Dataflow for WebCrawler

我想创建一个网络爬虫,它将下载位于某个 URL 的页面,搜索一些元素,然后为其创建一个结果,该结果将准备好保存到数据库中。但是我希望这个DB部分可以批量保存

最后一部分是,是什么让整个练习变得有点困难(至少就我目前对 TPL 数据流的理解而言,它有 1 天的历史;))我知道,有 BatchBlock 元素,但场景我看到它,很简单,它是第一步,正在“批处理”应用程序中给出的输入(不是内部管道工作)而且我试图将这个批处理部分放在管道内的某个地方,但我要么被迫将 url 的列表传递给第一步(然后下载 url 阶段将是一个步骤,其他步骤将等到这个完成)或者我可以传递一个 url 到管道,但随后注意到从 1 url 开始批处理,有一个解析元素要保存到 DB :)

这就是我想要实现的目标:

当然重要的是,每次下载 url 与其他“下载 url”操作“独立”。因此,一旦下载了某个页面,它就可以立即转到网络抓取部分。一旦准备就绪,它可以立即进入在 DB 中保存的阶段(因此等到 x 元素的批次到来 - 例如 - 5)然后将其保存到 DB。

当然不用说了,“下载url”和“Webscrape必要数据”转换都是异步操作

也许这不是您可以使用 TPL Dataflow 解决的问题?请指教:)

[更新 - 2020 年 8 月 7 日 13:25]

好吧,昨天我做了一个错误的假设,我 post 只有一件事在管道中,因为签名需要一个字符串。这显然是错误的假设,因为我可以多次调用它:)

我有或多或少的工作示例,但缺少两件事。将其更改为异步以及如何刷新 BatchBlock。因为如果我有大小为 3 的 BatchBlock 并将其发送到管道 8 URLs,我只会从前 6.

收到响应

这个例子的另一个问题是......即使不需要刷新(所以我发送 9 URLs 并且 BatchBlock 是 3)程序仍然无限期地运行。问题出在哪里?

Console.WriteLine($"Processing started: {DateTime.Now.ToString()}");
var workBuffer = new BatchBlock<string>(3);
var downloadUrl = new TransformBlock<string, string>(url =>
{
    Thread.Sleep(int.Parse(url.Last().ToString()) * 1000);
    return url;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var parseContent = new TransformBlock<string, string>(content =>
{
    Thread.Sleep(int.Parse(content.Last().ToString()) * 1000 / 2);
    return $"parsing result for: {content}";
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

var saveToDb = new TransformBlock<string[], bool>(results =>
{
    Console.WriteLine($"results: {DateTime.Now.ToString()} {String.Join(", ", results)}");
    return true;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

downloadUrl.LinkTo(parseContent, new DataflowLinkOptions
{
    PropagateCompletion = true
});
parseContent.LinkTo(workBuffer, new DataflowLinkOptions
{
    PropagateCompletion = true
});
workBuffer.LinkTo(saveToDb, new DataflowLinkOptions
{
    PropagateCompletion = true
});

downloadUrl.Completion.ContinueWith(obj => parseContent.Complete());
parseContent.Completion.ContinueWith(obj => workBuffer.Complete());
workBuffer.Completion.ContinueWith(obj => saveToDb.Complete());

//last digit in string is treated as url download time (in seconds) and half of it is for processing time.  
downloadUrl.Post("http://some_site_to_parse.com2"); //downoading for this url is 2 sec, processing 1 sec. It will be ready to save to DB after 3 sec
downloadUrl.Post("http://some_site_to_parse.com3"); //downoading for this url is 3 sec, processing 1,5 sec. It will be ready to save to DB after 4,5 sec
downloadUrl.Post("http://some_site_to_parse.com4"); //downoading for this url is 4 sec, processing 2 sec. It will be ready to save to DB after 6 sec
//here should first batch be saved to DB after 6 seconds
downloadUrl.Post("http://some_site_to_parse.com5"); //downoading for this url is 5 sec, processing 2,5 sec. It will be ready to save to DB after 7,5 sec
downloadUrl.Post("http://some_site_to_parse.com6"); //downoading for this url is 6 sec, processing 3 sec. It will be ready to save to DB after 9 sec
downloadUrl.Post("http://some_site_to_parse.com7"); //downoading for this url is 7 sec, processing 3,5 sec. It will be ready to save to DB after 10,5 sec
//here should second batch be saved to DB after 10,5 seconds
downloadUrl.Post("http://some_site_to_parse.com8"); //downoading for this url is 8 sec, processing 4 sec. It will be ready to save to DB after 12 sec
downloadUrl.Post("http://some_site_to_parse.com9"); //downoading for this url is 9 sec, processing 4,5 sec. It will be ready to save to DB after 13,5 sec
downloadUrl.Post("http://some_site_to_parse.com10"); //downoading for this url is 10 sec, processing 5 sec. It will be ready to save to DB after 15 sec
//here should third batch be saved to DB after 15 seconds

downloadUrl.Complete();
saveToDb.Completion.Wait();

总结三个问题:

  1. 如何刷新 BatchBlock
  2. 为什么这个示例应用 运行 无限期
  3. 如何使其异步

[更新 2 - 2020 年 8 月 7 日 14:28]

有人建议这是我的问题的解决方案:

但是我已经添加了所有 , new DataflowLinkOptions { PropagateCompletion = true } 并添加了 workBuffer.Completion.ContinueWith(obj => saveToDb.Complete()); 但它仍然无法正常工作

我认为这可以满足您的需求...

首先,创建一个供大家使用的客户端:

private static readonly HttpClient _client = new HttpClient(new HttpClientHandler
{
    AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
});

然后这是我构建块并将它们链接起来的方式:

const int maxDegreeOfParalleism = 10;

// first in, first out buffer block
var uriInputBlock = new BufferBlock<Uri>();

// transform block will download the data to string
var downloadHttpDataBlock = new TransformBlock<Uri, string>(async uri =>
{
    using(var msg = new HttpRequestMessage(HttpMethod.Get, uri))
    using(var resp = await _client.SendAsync(msg, HttpCompletionOption.ResponseHeadersRead))
    {
        return await resp.Content.ReadAsStringAsync().ConfigureAwait(false);
    }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });

// this block will take the data and scrape what it wants
var htmlScrapeBlock = new TransformBlock<string, string[]>(data =>
{
    var doc = new HtmlAgilityPack.HtmlDocument();
    doc.LoadHtml(data);
    return doc.DocumentNode.SelectNodes("//a[@href]").
        Select(x => x.GetAttributeValue("href", string.Empty)).ToArray();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParalleism });

// take in arrays and send them out as single elements
var manyToOneBlock = new TransformManyBlock<string[], string>(x => x);

// output data to a batch block with grouping of 10
var outputDataBlcok = new BatchBlock<string>(10);

// final block to store it somewhere
var databaseBlock = new ActionBlock<string[]>(x =>
{
    Console.WriteLine($"Group of {x.Length} items to be processed:");
    foreach (var uri in x)
    {
        Console.WriteLine($"Store this: {uri}");
    }
});

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
uriInputBlock.LinkTo(downloadHttpDataBlock, linkOptions);
downloadHttpDataBlock.LinkTo(htmlScrapeBlock, linkOptions);
htmlScrapeBlock.LinkTo(manyToOneBlock, linkOptions);
manyToOneBlock.LinkTo(outputDataBlcok, linkOptions);
outputDataBlcok.LinkTo(databaseBlock, linkOptions);

uriInputBlock.Post(new Uri("https://whosebug.com"));
uriInputBlock.Post(new Uri("https://google.com"));
uriInputBlock.Post(new Uri("https://yahoo.com"));
uriInputBlock.Post(new Uri("https://example.com"));

// When you want to complete/close down the pipeline, call this
uriInputBlock.Complete();
// you can wait for all data to finish propogating by calling this:
databaseBlock.Completion.Wait();

这只是一个基本概念,显然您可以使这个好得多,但它应该能让您入门。有关许多不同块的更多信息 here

我建议您看看 Microsoft 的 Reactive Framework(又名 Rx),因为它使这种处理变得超级简单。

如果我可以假设您有 List<string> urls 并且您有以下方法:

Task<string> DownloadUrlAsync(string url)
Task<string> WebscrapeAsync(string content)
Task SaveDataToDBAsync(IList<string> data)

...那么你可以用 Rx 来做到这一点:

int buffer_size = 50;
IObservable<Unit> query =
    urls
        .ToObservable()
        .SelectMany(url => Observable.FromAsync(() => DownloadUrlAsync(url)))
        .SelectMany(content => Observable.FromAsync(() => WebscrapeAsync(content)))
        .Buffer(buffer_size)
        .SelectMany(buffer => Observable.FromAsync(() => SaveDataToDBAsync(buffer)));
        
IDisposable subscription = query.Subscribe();

该查询使用多线程处理所有异步调用,缓冲内容并保存到数据库。

.Subscribe 方法也有回调来处理生成的值,任何异常,and/or 完成。

您需要 NuGet System.Reactive 并添加 using System.Reactive.Linq; 以获取位。