Web 抓取工具迭代页面 Rx.js

Web scraper iterating over pages with Rx.js

大约一个月前,我构建了 this web scraper using Async / Await as a async way of collecting info for a web scraper. I'm trying to build that very same scraper again using Rx.js。我通读了文档,这似乎是有道理的,开始是最困难的,但在那个驼峰之后我取得了一些进步。

你可以在这里看到我获得了网站的第一页(第 0 页),我需要使用该页面来获取页数(大约 6000 页)。我有那个计数并使用 getPageURI(page) 我可以创建每个页面 URL,但是我的问题是我不知道如何 trigger,或 fire,或pipe信息回原pageRequestStream。我有这个页数,我需要一种方法来迭代它,将数据推回第一个原始 pageRequestStream 流。

import cheerio from 'cheerio'
import Rx from 'rx'
import fetch from 'isomorphic-fetch'

const DIGITAL_NYC_URI = 'http://www.digital.nyc'
let getPageURI = (page) => `${DIGITAL_NYC_URI}/startups?page=${page}`
let getProfileURI = (profile) => `${DIGITAL_NYC_URI}${profile}`

function fetchURL(stream, dataType = 'json') {
  return stream.flatMap(requestURL => {
    return Rx.Observable.fromPromise(fetch(requestURL).then(res => res[dataType]()))
  })
}

function getNumberOfPages($) {
  let summary = $('.result-summary').text()
  let match = summary.match(/Showing 1 - 20 of (\d+) Startups/)
  return parseInt(match[1], 10)
}

function getCompaniesOnPage ($) {
  let companySelector = 'h3.node-title a'
  let companies = $(companySelector).map(function (i, el) {
    let name = $(this).text()
    let profile = $(this).attr('href')
    return {
      'name': name,
      'profile': profile
    }
  }).get()
  return companies
}

let pageRequestStream = Rx.Observable.just(getPageURI(0))

let pageResponseStream = fetchURL(pageRequestStream, 'text')

let parsedPageHTMLStream = pageResponseStream.map(html => cheerio.load(html))

let numberOfPagesStream = parsedPageHTMLStream.map(html => getNumberOfPages(html))

// not sure how to get this to iterate over count and fire url's into pageRequestStream
numberOfPagesStream.subscribe(pageCount => console.log(pageCount))

let companiesOnPageStream = parsedPageHTMLStream.flatMap(html => getCompaniesOnPage(html))

// not sure how to build up the company object to include async value company.profileHTML
companiesOnPageStream.subscribe(companies => console.log(companies))


// let companyProfileStream = companiesOnPageStream.map((company) => {
//   return fetch(getProfileURI(company.profile))
//     .then(res => res.html())
//     .then(html => {
//       company.profileHTML = html
//       return company
//     })
// })

看看 subjects,它们允许您随时触发事件。

也许这可以作为一些灵感

import cheerio from 'cheerio';
import Rx from 'rx';
import fetch from 'isomorphic-fetch';

function getCheerio(url) {
  var promise = fetch(url)
        .then(response => response.text())
        .then(body => cheerio.load(body));
  return Rx.Observable.fromPromise(promise);
}

const DIGITAL_NYC_URI = 'http://www.digital.nyc';

var pageRequest = new Rx.Subject();

pageRequest
  .flatMap(pageUrl => getCheerio(pageUrl))
  .flatMap(page$ => {
    // here we pipe back urls into our original observable.
    var nextPageUrl = page$('ul.pagination li.arrow a').attr('href');
    if(nextPageUrl) pageRequest.onNext(DIGITAL_NYC_URI + '/' + nextPageUrl);

    var profileUrls = page$('h3.node-title a')
          .map(function() {
            var url = page$(this).attr('href');
            return DIGITAL_NYC_URI + '/' + url;
          });
    return Rx.Observable.from(profileUrls);
  })
  .flatMap(url => getCheerio(url))
  .map(profile$ => {
    // build the company profile here
    return profile$('title').text();
  })
  .subscribe(value => console.log('profile ',  value));

pageRequest.onNext(DIGITAL_NYC_URI + '/startups');