使用Node js在for循环中同步多个请求

Synchronous Multiple Requests in foor loop with Node js

我刚从 Javascript 开始,我需要帮助来弄清楚如何在遍历 for 循环时使此代码同步。 基本上我所做的是在循环内发出多个 POST 请求,然后我使用库 X-Ray 废弃数据,最后我将结果保存到 Mongo 数据库。 输出没问题,但它以无序方式出现并突然挂起,我必须使用 ctrl+C 强制关闭。这是我的功能:

  function getdata() {
  const startYear = 1996;
  const currentYear = 1998; // new Date().getFullYear()

for (let i = startYear; i <= currentYear; i++) {
for (let j = 1; j <= 12; j++) {
  if (i === startYear) {
    j = 12;
  }

  // Form to be sent
  const form = {
    year: `${i}`,
    month: `${j}`,
    day: '01',
  };

  const formData = querystring.stringify(form);
  const contentLength = formData.length;

  // Make HTTP Request
  request({
    headers: {
      'Content-Length': contentLength,
      'Content-Type': 'application/x-www-form-urlencoded',
    },
    uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
    body: formData,
    method: 'POST',
  }, (err, res, html) => {

    if (!err && res.statusCode === 200) {

      // Scrapping data with X-Ray
      x(html, '#divID0 > table > tr', {
        date: '.block90w',
        lat: 'td:nth-child(2)',
        lon: 'td:nth-child(3)',
        prof: 'td:nth-child(4)',
        mag: 'td:nth-child(5)',
        local: 'td:nth-child(6)',
        degree: 'td:nth-child(7)',
      })((error, obj) => {

        const result = {
          date: obj.date,
          lat: obj.lat.replace(',', '.'),
          lon: obj.lon.replace(',', '.'),
          prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
          mag: obj.mag.replace(',', '.'),
          local: obj.local,
          degree: obj.degree,
        };

        // console.log(result);

        upsertEarthquake(result); // save to DB

      });

    }


  });

  }
  }
  }

我想我必须使用 promises 或回调,但我不明白该怎么做,我已经尝试使用 async await 但没有成功。如果需要提供任何其他信息,请告诉我,谢谢。

您正在循环内调用请求。

异步函数是在主线程逻辑结束后获取结果(A.K.A,在回调函数中接收响应)的函数。

这样,如果我们有这个:

for (var i = 0; i < 12; i++) {
    request({
        data: i
    }, function(error, data) {
        // This is the request result, inside a callback function
    });
}

在调用回调之前的 12 request 秒内,逻辑将是 运行,因此回调将在所有主循环 运行 之后被堆叠和调用。

无需输入所有 ES6 生成器(因为我认为这会使它变得更复杂,而且在低级别上学习对你来说正在发生的事情更好),你要做的是调用 request,等待他的回调函数被调用,调用下一个request。怎么做?有很多方法,但我通常是这样的:

var i= 0;
function callNext() {
    if (i>= 12) {
        requestEnded();
    } else {
        request({
            data: i++ // Increment the counter as we are not inside a for loop that increments it
        }, function(error, data) {
            // Do something with the data, and also check if an error was received and act accordingly, which is very much possible when talking about internet requests
            console.log(error, data);
            // Call the next request inside the callback, so we are sure that the next request is ran just after this request has ended
            callNext();
        })
    }
}
callNext();

requestEnded() {
    console.log("Yay");
}

这里你看到了逻辑。您有一个名为 callNext 的函数,如果不需要更多调用,它将进行下一次调用或调用 requestEnded

requestcallNext中被调用时,它会等待回调被接收(这将异步发生,在未来的某个时候),将处理接收到的数据然后在回调你告诉他再打callNext.

您可以使用开始和结束年份创建一个数组,然后将其映射到您的请求配置,然后将其结果映射到 x-ray returns(x-ray returns a promise like 所以不需要回调)。然后使用 returns 承诺的函数将抓取结果放入 mongodb。

如果拒绝,则创建一个 Fail 类型的对象并用该对象解决。

使用 Promise.all 并行启动所有请求、x-ray 和 mongo,但使用 throttle.

限制活动请求的数量

代码如下所示:

//you can get library containing throttle here:
//  https://github.com/amsterdamharu/lib/blob/master/src/index.js
const lib = require('lib');
const Fail = function(details){this.details=details;};
const isFail = o=>(o&&o.constructor)===Fail;
const max10 = lib.throttle(10);
const range = lib.range;
const createYearMonth = (startYear,endYear)=>
  range(startYear,endYear)
  .reduce(
    (acc,year)=>
      acc.concat(
        range(1,12).map(month=>({year,month}))
      )
    ,[]
  );
const toRequestConfigs = yearMonths =>
  yearMonths.map(
    yearMonth=>{
      const formData = querystring.stringify(yearMonth);
      return {
        headers: {
          'Content-Length': formData.length,
          'Content-Type': 'application/x-www-form-urlencoded',
        },
        uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
        body: formData,
        method: 'POST',
      };
    }
  );
const scrape = html =>
  x(
    html, 
    '#divID0 > table > tr', 
    {
      date: '.block90w',
      lat: 'td:nth-child(2)',
      lon: 'td:nth-child(3)',
      prof: 'td:nth-child(4)',
      mag: 'td:nth-child(5)',
      local: 'td:nth-child(6)',
      degree: 'td:nth-child(7)'
    }
  );
const requestAsPromise = config =>
  new Promise(
    (resolve,reject)=>
      request(
        config,
        (err,res,html)=>
          (!err && res.statusCode === 200) 
            //x-ray returns a promise:
            // https://github.com/matthewmueller/x-ray#xraythencb
            ? resolve(html)
            : reject(err)
      )
  );
const someMongoStuff = scrapeResult =>
  //do mongo stuff and return promise
  scrapeResult;
const getData = (startYear,endYear) =>
  Promise.all(
    toRequestConfigs(
      createYearMonth(startYear,endYear)
    )
    .map(
      config=>
        //maximum 10 active requests
        max10(requestAsPromise)(config)
        .then(scrape)
        .then(someMongoStuff)
        .catch(//if something goes wrong create a Fail type object
          err => new Fail([err,config.body])
        )
    )
  )
//how to use:
getData(1980,1982)
.then(//will always resolve unless toRequestConfigs or createYearMonth throws
  result=>{
    //items that were successfull
    const successes = result.filter(item=>!isFail(item));
    //items that failed
    const failed = result.filter(isFail);
  }
)

抓取经常发生的事情是,目标站点不允许您在 y 期间发出超过 x 次的请求,如果您超过该次数,就会开始将您的 IP 列入黑名单并拒绝服务。

假设你想限制每 5 秒 10 个请求,那么你可以将上面的代码更改为:

const max10 = lib.throttlePeriod(10,5000);

其余代码相同

你有 sync for...loop 里面有 async methods 的问题。

解决这个问题的一个干净的方法是

ES2017 async/await syntax

假设您想在 upsertEarthquake(result) 之后停止每次迭代,您应该将代码更改为类似这样的内容。

function async getdata() {
    const startYear = 1996;
    const currentYear = 1998; // new Date().getFullYear()

    for (let i = startYear; i <= currentYear; i++) {
        for (let j = 1; j <= 12; j++) {
            if (i === startYear)
                j = 12; 

            // Form to be sent
            const form = {
                year: `${i}`,
                month: `${j}`,
                day: '01',
            };

            const formData = querystring.stringify(form);
            const contentLength = formData.length;
            //Make HTTP Request
            await new Promise((next, reject)=> { 
                request({
                    headers: {
                        'Content-Length': contentLength,
                        'Content-Type': 'application/x-www-form-urlencoded',
                    },
                    uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
                    body: formData,
                    method: 'POST',
                }, (err, res, html) => {
                    if (err || res.statusCode !== 200)
                        return next() //If there is an error jump to the next

                    //Scrapping data with X-Ray
                    x(html, '#divID0 > table > tr', {
                        date: '.block90w',
                        lat: 'td:nth-child(2)',
                        lon: 'td:nth-child(3)',
                        prof: 'td:nth-child(4)',
                        mag: 'td:nth-child(5)',
                        local: 'td:nth-child(6)',
                        degree: 'td:nth-child(7)',
                    })((error, obj) => {
                        const result = {
                            date: obj.date,
                            lat: obj.lat.replace(',', '.'),
                            lon: obj.lon.replace(',', '.'),
                            prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
                            mag: obj.mag.replace(',', '.'),
                            local: obj.local,
                            degree: obj.degree,
                        }
                        //console.log(result);
                        upsertEarthquake(result); // save to DB
                        next() //This makes jump to the next for... iteration
                    })

                }) 
            }
        }
    }
}

我假设 upsertEarthquake 是一个异步函数或者是即发即弃类型。

如果有错误你可以使用next(),但是如果你想打破循环,使用reject()

if (err || res.statusCode !== 200)
    return reject(err)