运行 块中的 HTTP 请求
Run HTTP requests in chunks
我想 运行 1 在可配置的块中发送 HTTP 请求,并在块请求之间设置可配置的超时。该请求基于 some.csv 文件提供的数据。
它不起作用,因为我得到一个 TypeError,但是当我在 f 之后删除 () 时,它也不起作用。
如果能提供一点帮助,我将不胜感激。可能最大的问题是我并不真正理解 promises 究竟是如何工作的,但我尝试了多种解决方案,但我无法实现我想要的。
超时功能可能会让我更加头疼,所以如果有任何提示,我将不胜感激。
你能帮我理解为什么它不起作用吗?
这是片段:
const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');
const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];
async function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return async () => { return await rp(options) };
}
async function batchRequestRunner(data) {
const promises = [];
for (row of data) {
row = row.split(',');
promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
}
const batches = chunk(promises, BATCH_SIZE);
for (let batch of batches) {
try {
Promise.all(
batch.map(async f => { return await f();})
).then((resp) => console.log(resp));
} catch (e) {
console.log(e);
}
}
}
async function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
await batchRequestRunner(requestData);
}
main();
对第一条评论的澄清:
我有一个如下所示的 csv 文件:
clientId,startTime
123,13:40:00
321,13:50:00
文件大小约为 100k 行
该文件包含如何更新数据库中特定 clientId 的时间的信息。我无权访问数据库,但我可以访问 API,它允许更新数据库中的条目。
我不能一次拨打 100k 个电话,因为:我的网络有限(我因为冠状病毒而远程工作),它消耗了大量内存,并且 API 也可能受到限制,如果我发出所有请求可能会崩溃立刻。
我想达到的目标:
将csv加载到内存中,将其转换为数组
分块处理api请求,例如从数组中取前两行,根据前两行进行API调用,等待1000ms,再取两行行,并继续处理直到数组结束(csv 文件)
好吧,这似乎是一个有点经典的案例,您希望通过一些异步操作处理一组值,并避免消耗太多资源或使目标服务器不堪重负,您希望拥有的不超过N 个请求同时在飞行中。这是一个常见问题,有针对它的预建解决方案。我的 goto 解决方案是一小段名为 mapConcurrent()
的代码。它与 array.map()
类似,但它假设有一个返回承诺的异步回调,并且您向它传递了应该同时运行的最大项目数。然后它 returns 给你一个解析为结果数组的承诺。
这里是mapConcurrent()
:
// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
let index = 0;
let inFlightCntr = 0;
let doneCntr = 0;
let results = new Array(items.length);
let stop = false;
return new Promise(function(resolve, reject) {
function runNext() {
let i = index;
++inFlightCntr;
fn(items[index], index++).then(function(val) {
++doneCntr;
--inFlightCntr;
results[i] = val;
run();
}, function(err) {
// set flag so we don't launch any more requests
stop = true;
reject(err);
});
}
function run() {
// launch as many as we're allowed to
while (!stop && inflightCntr < maxConcurrent && index < items.length) {
runNext();
}
// if all are done, then resolve parent promise with results
if (doneCntr === items.length) {
resolve(results);
}
}
run();
});
}
然后您的代码可以被构造为像这样使用它:
function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return rp(options);
}
function processRow(row) {
let rowData = row.split(",");
return update(rowData[0], rowData[1], rowData[2]);
}
function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
// process this entire array with up to 5 requests "in-flight" at the same time
mapConcurrent(requestData, 5, processRow).then(results => {
console.log(results);
}).catch(err => {
console.log(err);
});
}
您显然可以将并发请求数调整为您想要的任何数量。我在这个例子中将它设置为 5。
我想 运行 1 在可配置的块中发送 HTTP 请求,并在块请求之间设置可配置的超时。该请求基于 some.csv 文件提供的数据。
它不起作用,因为我得到一个 TypeError,但是当我在 f 之后删除 () 时,它也不起作用。 如果能提供一点帮助,我将不胜感激。可能最大的问题是我并不真正理解 promises 究竟是如何工作的,但我尝试了多种解决方案,但我无法实现我想要的。
超时功能可能会让我更加头疼,所以如果有任何提示,我将不胜感激。
你能帮我理解为什么它不起作用吗?
这是片段:
const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');
const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];
async function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return async () => { return await rp(options) };
}
async function batchRequestRunner(data) {
const promises = [];
for (row of data) {
row = row.split(',');
promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
}
const batches = chunk(promises, BATCH_SIZE);
for (let batch of batches) {
try {
Promise.all(
batch.map(async f => { return await f();})
).then((resp) => console.log(resp));
} catch (e) {
console.log(e);
}
}
}
async function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
await batchRequestRunner(requestData);
}
main();
对第一条评论的澄清:
我有一个如下所示的 csv 文件:
clientId,startTime
123,13:40:00
321,13:50:00
文件大小约为 100k 行 该文件包含如何更新数据库中特定 clientId 的时间的信息。我无权访问数据库,但我可以访问 API,它允许更新数据库中的条目。 我不能一次拨打 100k 个电话,因为:我的网络有限(我因为冠状病毒而远程工作),它消耗了大量内存,并且 API 也可能受到限制,如果我发出所有请求可能会崩溃立刻。
我想达到的目标:
将csv加载到内存中,将其转换为数组
分块处理api请求,例如从数组中取前两行,根据前两行进行API调用,等待1000ms,再取两行行,并继续处理直到数组结束(csv 文件)
好吧,这似乎是一个有点经典的案例,您希望通过一些异步操作处理一组值,并避免消耗太多资源或使目标服务器不堪重负,您希望拥有的不超过N 个请求同时在飞行中。这是一个常见问题,有针对它的预建解决方案。我的 goto 解决方案是一小段名为 mapConcurrent()
的代码。它与 array.map()
类似,但它假设有一个返回承诺的异步回调,并且您向它传递了应该同时运行的最大项目数。然后它 returns 给你一个解析为结果数组的承诺。
这里是mapConcurrent()
:
// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
let index = 0;
let inFlightCntr = 0;
let doneCntr = 0;
let results = new Array(items.length);
let stop = false;
return new Promise(function(resolve, reject) {
function runNext() {
let i = index;
++inFlightCntr;
fn(items[index], index++).then(function(val) {
++doneCntr;
--inFlightCntr;
results[i] = val;
run();
}, function(err) {
// set flag so we don't launch any more requests
stop = true;
reject(err);
});
}
function run() {
// launch as many as we're allowed to
while (!stop && inflightCntr < maxConcurrent && index < items.length) {
runNext();
}
// if all are done, then resolve parent promise with results
if (doneCntr === items.length) {
resolve(results);
}
}
run();
});
}
然后您的代码可以被构造为像这样使用它:
function update(id, time, query) {
const options = {
method: 'POST',
uri: `https://requesturl/${id}?query=${query}`,
body: {
"prop": {
"time": time
}
},
headers: {
"Content-Type": "application/json"
},
json: true
}
return rp(options);
}
function processRow(row) {
let rowData = row.split(",");
return update(rowData[0], rowData[1], rowData[2]);
}
function main() {
const input = fs.readFileSync('./input.test.csv').toString().split("\n");
const requestData = input.slice(1);
// process this entire array with up to 5 requests "in-flight" at the same time
mapConcurrent(requestData, 5, processRow).then(results => {
console.log(results);
}).catch(err => {
console.log(err);
});
}
您显然可以将并发请求数调整为您想要的任何数量。我在这个例子中将它设置为 5。