使用Node js在for循环中同步多个请求
Synchronous Multiple Requests in foor loop with Node js
我刚从 Javascript 开始,我需要帮助来弄清楚如何在遍历 for 循环时使此代码同步。
基本上我所做的是在循环内发出多个 POST 请求,然后我使用库 X-Ray 废弃数据,最后我将结果保存到 Mongo 数据库。
输出没问题,但它以无序方式出现并突然挂起,我必须使用 ctrl+C 强制关闭。这是我的功能:
function getdata() {
const startYear = 1996;
const currentYear = 1998; // new Date().getFullYear()
for (let i = startYear; i <= currentYear; i++) {
for (let j = 1; j <= 12; j++) {
if (i === startYear) {
j = 12;
}
// Form to be sent
const form = {
year: `${i}`,
month: `${j}`,
day: '01',
};
const formData = querystring.stringify(form);
const contentLength = formData.length;
// Make HTTP Request
request({
headers: {
'Content-Length': contentLength,
'Content-Type': 'application/x-www-form-urlencoded',
},
uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
body: formData,
method: 'POST',
}, (err, res, html) => {
if (!err && res.statusCode === 200) {
// Scrapping data with X-Ray
x(html, '#divID0 > table > tr', {
date: '.block90w',
lat: 'td:nth-child(2)',
lon: 'td:nth-child(3)',
prof: 'td:nth-child(4)',
mag: 'td:nth-child(5)',
local: 'td:nth-child(6)',
degree: 'td:nth-child(7)',
})((error, obj) => {
const result = {
date: obj.date,
lat: obj.lat.replace(',', '.'),
lon: obj.lon.replace(',', '.'),
prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
mag: obj.mag.replace(',', '.'),
local: obj.local,
degree: obj.degree,
};
// console.log(result);
upsertEarthquake(result); // save to DB
});
}
});
}
}
}
我想我必须使用 promises 或回调,但我不明白该怎么做,我已经尝试使用 async await 但没有成功。如果需要提供任何其他信息,请告诉我,谢谢。
您正在循环内调用请求。
异步函数是在主线程逻辑结束后获取结果(A.K.A,在回调函数中接收响应)的函数。
这样,如果我们有这个:
for (var i = 0; i < 12; i++) {
request({
data: i
}, function(error, data) {
// This is the request result, inside a callback function
});
}
在调用回调之前的 12 request
秒内,逻辑将是 运行,因此回调将在所有主循环 运行 之后被堆叠和调用。
无需输入所有 ES6 生成器(因为我认为这会使它变得更复杂,而且在低级别上学习对你来说正在发生的事情更好),你要做的是调用 request
,等待他的回调函数被调用,调用下一个request
。怎么做?有很多方法,但我通常是这样的:
var i= 0;
function callNext() {
if (i>= 12) {
requestEnded();
} else {
request({
data: i++ // Increment the counter as we are not inside a for loop that increments it
}, function(error, data) {
// Do something with the data, and also check if an error was received and act accordingly, which is very much possible when talking about internet requests
console.log(error, data);
// Call the next request inside the callback, so we are sure that the next request is ran just after this request has ended
callNext();
})
}
}
callNext();
requestEnded() {
console.log("Yay");
}
这里你看到了逻辑。您有一个名为 callNext
的函数,如果不需要更多调用,它将进行下一次调用或调用 requestEnded
。
当request
在callNext
中被调用时,它会等待回调被接收(这将异步发生,在未来的某个时候),将处理接收到的数据然后在回调你告诉他再打callNext
.
您可以使用开始和结束年份创建一个数组,然后将其映射到您的请求配置,然后将其结果映射到 x-ray returns(x-ray returns a promise like 所以不需要回调)。然后使用 returns 承诺的函数将抓取结果放入 mongodb。
如果拒绝,则创建一个 Fail
类型的对象并用该对象解决。
使用 Promise.all 并行启动所有请求、x-ray 和 mongo,但使用 throttle.
限制活动请求的数量
代码如下所示:
//you can get library containing throttle here:
// https://github.com/amsterdamharu/lib/blob/master/src/index.js
const lib = require('lib');
const Fail = function(details){this.details=details;};
const isFail = o=>(o&&o.constructor)===Fail;
const max10 = lib.throttle(10);
const range = lib.range;
const createYearMonth = (startYear,endYear)=>
range(startYear,endYear)
.reduce(
(acc,year)=>
acc.concat(
range(1,12).map(month=>({year,month}))
)
,[]
);
const toRequestConfigs = yearMonths =>
yearMonths.map(
yearMonth=>{
const formData = querystring.stringify(yearMonth);
return {
headers: {
'Content-Length': formData.length,
'Content-Type': 'application/x-www-form-urlencoded',
},
uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
body: formData,
method: 'POST',
};
}
);
const scrape = html =>
x(
html,
'#divID0 > table > tr',
{
date: '.block90w',
lat: 'td:nth-child(2)',
lon: 'td:nth-child(3)',
prof: 'td:nth-child(4)',
mag: 'td:nth-child(5)',
local: 'td:nth-child(6)',
degree: 'td:nth-child(7)'
}
);
const requestAsPromise = config =>
new Promise(
(resolve,reject)=>
request(
config,
(err,res,html)=>
(!err && res.statusCode === 200)
//x-ray returns a promise:
// https://github.com/matthewmueller/x-ray#xraythencb
? resolve(html)
: reject(err)
)
);
const someMongoStuff = scrapeResult =>
//do mongo stuff and return promise
scrapeResult;
const getData = (startYear,endYear) =>
Promise.all(
toRequestConfigs(
createYearMonth(startYear,endYear)
)
.map(
config=>
//maximum 10 active requests
max10(requestAsPromise)(config)
.then(scrape)
.then(someMongoStuff)
.catch(//if something goes wrong create a Fail type object
err => new Fail([err,config.body])
)
)
)
//how to use:
getData(1980,1982)
.then(//will always resolve unless toRequestConfigs or createYearMonth throws
result=>{
//items that were successfull
const successes = result.filter(item=>!isFail(item));
//items that failed
const failed = result.filter(isFail);
}
)
抓取经常发生的事情是,目标站点不允许您在 y 期间发出超过 x 次的请求,如果您超过该次数,就会开始将您的 IP 列入黑名单并拒绝服务。
假设你想限制每 5 秒 10 个请求,那么你可以将上面的代码更改为:
const max10 = lib.throttlePeriod(10,5000);
其余代码相同
你有 sync for...loop
里面有 async methods
的问题。
解决这个问题的一个干净的方法是
ES2017 async/await
syntax
假设您想在 upsertEarthquake(result)
之后停止每次迭代,您应该将代码更改为类似这样的内容。
function async getdata() {
const startYear = 1996;
const currentYear = 1998; // new Date().getFullYear()
for (let i = startYear; i <= currentYear; i++) {
for (let j = 1; j <= 12; j++) {
if (i === startYear)
j = 12;
// Form to be sent
const form = {
year: `${i}`,
month: `${j}`,
day: '01',
};
const formData = querystring.stringify(form);
const contentLength = formData.length;
//Make HTTP Request
await new Promise((next, reject)=> {
request({
headers: {
'Content-Length': contentLength,
'Content-Type': 'application/x-www-form-urlencoded',
},
uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
body: formData,
method: 'POST',
}, (err, res, html) => {
if (err || res.statusCode !== 200)
return next() //If there is an error jump to the next
//Scrapping data with X-Ray
x(html, '#divID0 > table > tr', {
date: '.block90w',
lat: 'td:nth-child(2)',
lon: 'td:nth-child(3)',
prof: 'td:nth-child(4)',
mag: 'td:nth-child(5)',
local: 'td:nth-child(6)',
degree: 'td:nth-child(7)',
})((error, obj) => {
const result = {
date: obj.date,
lat: obj.lat.replace(',', '.'),
lon: obj.lon.replace(',', '.'),
prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
mag: obj.mag.replace(',', '.'),
local: obj.local,
degree: obj.degree,
}
//console.log(result);
upsertEarthquake(result); // save to DB
next() //This makes jump to the next for... iteration
})
})
}
}
}
}
我假设 upsertEarthquake
是一个异步函数或者是即发即弃类型。
如果有错误你可以使用next()
,但是如果你想打破循环,使用reject()
if (err || res.statusCode !== 200)
return reject(err)
我刚从 Javascript 开始,我需要帮助来弄清楚如何在遍历 for 循环时使此代码同步。 基本上我所做的是在循环内发出多个 POST 请求,然后我使用库 X-Ray 废弃数据,最后我将结果保存到 Mongo 数据库。 输出没问题,但它以无序方式出现并突然挂起,我必须使用 ctrl+C 强制关闭。这是我的功能:
function getdata() {
const startYear = 1996;
const currentYear = 1998; // new Date().getFullYear()
for (let i = startYear; i <= currentYear; i++) {
for (let j = 1; j <= 12; j++) {
if (i === startYear) {
j = 12;
}
// Form to be sent
const form = {
year: `${i}`,
month: `${j}`,
day: '01',
};
const formData = querystring.stringify(form);
const contentLength = formData.length;
// Make HTTP Request
request({
headers: {
'Content-Length': contentLength,
'Content-Type': 'application/x-www-form-urlencoded',
},
uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
body: formData,
method: 'POST',
}, (err, res, html) => {
if (!err && res.statusCode === 200) {
// Scrapping data with X-Ray
x(html, '#divID0 > table > tr', {
date: '.block90w',
lat: 'td:nth-child(2)',
lon: 'td:nth-child(3)',
prof: 'td:nth-child(4)',
mag: 'td:nth-child(5)',
local: 'td:nth-child(6)',
degree: 'td:nth-child(7)',
})((error, obj) => {
const result = {
date: obj.date,
lat: obj.lat.replace(',', '.'),
lon: obj.lon.replace(',', '.'),
prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
mag: obj.mag.replace(',', '.'),
local: obj.local,
degree: obj.degree,
};
// console.log(result);
upsertEarthquake(result); // save to DB
});
}
});
}
}
}
我想我必须使用 promises 或回调,但我不明白该怎么做,我已经尝试使用 async await 但没有成功。如果需要提供任何其他信息,请告诉我,谢谢。
您正在循环内调用请求。
异步函数是在主线程逻辑结束后获取结果(A.K.A,在回调函数中接收响应)的函数。
这样,如果我们有这个:
for (var i = 0; i < 12; i++) {
request({
data: i
}, function(error, data) {
// This is the request result, inside a callback function
});
}
在调用回调之前的 12 request
秒内,逻辑将是 运行,因此回调将在所有主循环 运行 之后被堆叠和调用。
无需输入所有 ES6 生成器(因为我认为这会使它变得更复杂,而且在低级别上学习对你来说正在发生的事情更好),你要做的是调用 request
,等待他的回调函数被调用,调用下一个request
。怎么做?有很多方法,但我通常是这样的:
var i= 0;
function callNext() {
if (i>= 12) {
requestEnded();
} else {
request({
data: i++ // Increment the counter as we are not inside a for loop that increments it
}, function(error, data) {
// Do something with the data, and also check if an error was received and act accordingly, which is very much possible when talking about internet requests
console.log(error, data);
// Call the next request inside the callback, so we are sure that the next request is ran just after this request has ended
callNext();
})
}
}
callNext();
requestEnded() {
console.log("Yay");
}
这里你看到了逻辑。您有一个名为 callNext
的函数,如果不需要更多调用,它将进行下一次调用或调用 requestEnded
。
当request
在callNext
中被调用时,它会等待回调被接收(这将异步发生,在未来的某个时候),将处理接收到的数据然后在回调你告诉他再打callNext
.
您可以使用开始和结束年份创建一个数组,然后将其映射到您的请求配置,然后将其结果映射到 x-ray returns(x-ray returns a promise like 所以不需要回调)。然后使用 returns 承诺的函数将抓取结果放入 mongodb。
如果拒绝,则创建一个 Fail
类型的对象并用该对象解决。
使用 Promise.all 并行启动所有请求、x-ray 和 mongo,但使用 throttle.
限制活动请求的数量代码如下所示:
//you can get library containing throttle here:
// https://github.com/amsterdamharu/lib/blob/master/src/index.js
const lib = require('lib');
const Fail = function(details){this.details=details;};
const isFail = o=>(o&&o.constructor)===Fail;
const max10 = lib.throttle(10);
const range = lib.range;
const createYearMonth = (startYear,endYear)=>
range(startYear,endYear)
.reduce(
(acc,year)=>
acc.concat(
range(1,12).map(month=>({year,month}))
)
,[]
);
const toRequestConfigs = yearMonths =>
yearMonths.map(
yearMonth=>{
const formData = querystring.stringify(yearMonth);
return {
headers: {
'Content-Length': formData.length,
'Content-Type': 'application/x-www-form-urlencoded',
},
uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
body: formData,
method: 'POST',
};
}
);
const scrape = html =>
x(
html,
'#divID0 > table > tr',
{
date: '.block90w',
lat: 'td:nth-child(2)',
lon: 'td:nth-child(3)',
prof: 'td:nth-child(4)',
mag: 'td:nth-child(5)',
local: 'td:nth-child(6)',
degree: 'td:nth-child(7)'
}
);
const requestAsPromise = config =>
new Promise(
(resolve,reject)=>
request(
config,
(err,res,html)=>
(!err && res.statusCode === 200)
//x-ray returns a promise:
// https://github.com/matthewmueller/x-ray#xraythencb
? resolve(html)
: reject(err)
)
);
const someMongoStuff = scrapeResult =>
//do mongo stuff and return promise
scrapeResult;
const getData = (startYear,endYear) =>
Promise.all(
toRequestConfigs(
createYearMonth(startYear,endYear)
)
.map(
config=>
//maximum 10 active requests
max10(requestAsPromise)(config)
.then(scrape)
.then(someMongoStuff)
.catch(//if something goes wrong create a Fail type object
err => new Fail([err,config.body])
)
)
)
//how to use:
getData(1980,1982)
.then(//will always resolve unless toRequestConfigs or createYearMonth throws
result=>{
//items that were successfull
const successes = result.filter(item=>!isFail(item));
//items that failed
const failed = result.filter(isFail);
}
)
抓取经常发生的事情是,目标站点不允许您在 y 期间发出超过 x 次的请求,如果您超过该次数,就会开始将您的 IP 列入黑名单并拒绝服务。
假设你想限制每 5 秒 10 个请求,那么你可以将上面的代码更改为:
const max10 = lib.throttlePeriod(10,5000);
其余代码相同
你有 sync for...loop
里面有 async methods
的问题。
解决这个问题的一个干净的方法是
ES2017
async/await
syntax
假设您想在 upsertEarthquake(result)
之后停止每次迭代,您应该将代码更改为类似这样的内容。
function async getdata() {
const startYear = 1996;
const currentYear = 1998; // new Date().getFullYear()
for (let i = startYear; i <= currentYear; i++) {
for (let j = 1; j <= 12; j++) {
if (i === startYear)
j = 12;
// Form to be sent
const form = {
year: `${i}`,
month: `${j}`,
day: '01',
};
const formData = querystring.stringify(form);
const contentLength = formData.length;
//Make HTTP Request
await new Promise((next, reject)=> {
request({
headers: {
'Content-Length': contentLength,
'Content-Type': 'application/x-www-form-urlencoded',
},
uri: 'https://www.ipma.pt/pt/geofisica/sismologia/',
body: formData,
method: 'POST',
}, (err, res, html) => {
if (err || res.statusCode !== 200)
return next() //If there is an error jump to the next
//Scrapping data with X-Ray
x(html, '#divID0 > table > tr', {
date: '.block90w',
lat: 'td:nth-child(2)',
lon: 'td:nth-child(3)',
prof: 'td:nth-child(4)',
mag: 'td:nth-child(5)',
local: 'td:nth-child(6)',
degree: 'td:nth-child(7)',
})((error, obj) => {
const result = {
date: obj.date,
lat: obj.lat.replace(',', '.'),
lon: obj.lon.replace(',', '.'),
prof: obj.prof == '-' ? null : obj.prof.replace(',', '.'),
mag: obj.mag.replace(',', '.'),
local: obj.local,
degree: obj.degree,
}
//console.log(result);
upsertEarthquake(result); // save to DB
next() //This makes jump to the next for... iteration
})
})
}
}
}
}
我假设 upsertEarthquake
是一个异步函数或者是即发即弃类型。
如果有错误你可以使用next()
,但是如果你想打破循环,使用reject()
if (err || res.statusCode !== 200)
return reject(err)