如何将 Promise.all() 限制为每秒 5 个承诺?
How to throttle Promise.all() to 5 promises per second?
我有几个项目需要向第 3 方 API 查询,并且说 API 的呼叫限制为每秒 5 次呼叫。我需要以某种方式将对 API 的调用限制为每秒最多 5 次调用。
到目前为止,我只是在一系列承诺上使用了 Promise.all()
,其中每个承诺向 API 发送请求并在 API 以 HTTP 状态响应时解析代码 200
并在它以其他状态代码响应时拒绝。但是,当我在数组中有超过 5 个项目时,我冒着 Promise.all()
拒绝的风险。
如何将 Promise.all()
调用限制为每秒 5 次调用?
希望对您有所帮助。
而且还要说这将使用 Promise.all
来解决所有请求,如果您有大量查询,这将等待所有问题都解决并且可能导致您的代码等待很多所有回应。
而且,如果其中一个请求被拒绝,Promise.all
将被拒绝。
我建议,如果您不需要所有结果,最好使用其他工具,例如 lodash debounce or throttle 或处理此问题的框架。
let items = [
{name: 'item1'},
{name: 'item2'},
{name: 'item3'},
{name: 'item4'},
{name: 'item5'},
{name: 'item6'}
];
// This is the api request that you send and return a promise
function apiCall(item) {
return new Promise((resolve) => {
setTimeout(() => resolve(item.name), 1000);
})
}
new Promise((resolve) => {
let results = [];
function sendReq (itemsList, iterate, apiCall) {
setTimeout(() => {
// slice itemsList to send request according to the api limit
let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
result = slicedArray.map(item => apiCall(item));
results = [...results, ...result];
// This will resolve the promise when reaches to the last iteration
if (iterate === Math.ceil(items.length / 5) - 1) {
resolve(results);
}
}, (1000 * iterate)); // every 1000ms runs (api limit of one second)
}
// This will make iteration to split array (requests) to chunks of five items
for (i = 0; i < Math.ceil(items.length / 5); i++) {
sendReq(items, i, apiCall);
}
}).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required
如果您不太担心按顺序解决承诺,您可以使用 bluebird 中的并发选项。
下面一次只能处理 5 个查询。
const Promise = require('bluebird');
const buildQueries = (count) => {
let queries = [];
for(let i = 0; i < count; i++) {
queries.push({user: i});
};
return queries;
};
const apiCall = (item) => {
return new Promise(async (resolve, reject) => {
await Promise.delay(1000);
resolve(item.user);
});
};
const queries = buildQueries(20);
Promise.map(queries, async query => {
console.log( await apiCall(query) );
}, {concurrency: 5});
在没有库的情况下使用 ES6
export async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}
export function split(arr, n) {
var res = [];
while (arr.length) {
res.push(arr.splice(0, n));
}
return res;
}
export const delayMS = (t = 200) => {
return new Promise(resolve => {
setTimeout(() => {
resolve(t);
}, t);
});
};
export const throttledPromises = (
asyncFunction,
items = [],
batchSize = 1,
delay = 0
) => {
return new Promise(async (resolve, reject) => {
const output = [];
const batches= split(items, batchSize);
await asyncForEach(batches, async (batch) => {
const promises = batch.map(asyncFunction).map(p => p.catch(reject));
const results = await Promise.all(promises);
output.push(...results);
await delayMS(delay);
});
resolve(output);
});
};
我想你可以把你的问题分成两部分:同时调用不超过 5 个,并确保最新的调用在最旧的调用 1 秒之后才发生。
第一部分很容易用一个很棒的 p-limit 库来解决——它有迄今为止我见过的最简单的界面。
对于第二部分,您需要实际跟踪每个呼叫何时开始 - 即实现等待功能:
基本pseudo-code,未测试:
import pLimit from 'p-limit';
const apiLimit = pLimit(5);
const startTimes = [];
async function rateLimiter(item) {
const lastSecond = (new Date().getTime()) - 1000;
if (startTimes.filter(v => v > lastSecond).length >= 5) {
await new Promise(r => setTimeout(r, 1000));
}
// TODO: cleanup startTimes to avoid memory leak
startTimes.push(new Date().getTime());
return apiCall(item);
}
await Promise.all(items.map(v => apiLimit(() => rateLimiter(v))))
我们可以使用生成器来发送组中的承诺列表。
一旦解决了第一个 yield,我们就可以进行另一个 yield。
我们将结果存储在一个数组中。
一旦 promiseArray 长度等于结果长度,我们就可以解决
包裹的承诺。
const fetch = require("isomorphic-fetch");
const totalPromiseLength = 5;
const requestMethod = url => () => fetch(url).then(response => response.json());
let promiseArray = [...new Array(totalPromiseLength).keys()].map(index =>
requestMethod("https://jsonplaceholder.typicode.com/todos/" + (index + 1))
);
function* chunks(arr, limit) {
for (let i = 0; i < Math.ceil(arr.length / limit); ++i) {
yield [...arr].slice(i * limit, i * limit + limit);
}
}
new Promise(async resolve => {
let generated = chunks(promiseArray, 2);
let result = [];
for (let bla of generated) {
await Promise.all(bla.map(param => param())).then(response => {
result = [...result, ...response];
if (result.length === promiseArray.length) {
resolve(result);
}
});
}
}).then(response => {
console.log(response);
});
也许我头脑简单,但我写的这个版本只是将传入数组分成 5 个承诺的块,并在每个块上执行 Promise.all()
:
utility.throttledPromiseAll = async (promises) => {
const MAX_IN_PROCESS = 5;
const results = new Array(promises.length);
async function doBlock(startIndex) {
// Shallow-copy a block of promises to work on
const currBlock = promises.slice(startIndex, startIndex + MAX_IN_PROCESS);
// Await the completion. If any fail, it will throw and that's good.
const blockResults = await Promise.all(currBlock);
// Assuming all succeeded, copy the results into the results array
for (let ix = 0; ix < blockResults.length; ix++) {
results[ix + startIndex] = blockResults[ix];
}
}
for (let iBlock = 0; iBlock < promises.length; iBlock += MAX_IN_PROCESS) {
await doBlock(iBlock);
}
return results;
};
我有几个项目需要向第 3 方 API 查询,并且说 API 的呼叫限制为每秒 5 次呼叫。我需要以某种方式将对 API 的调用限制为每秒最多 5 次调用。
到目前为止,我只是在一系列承诺上使用了 Promise.all()
,其中每个承诺向 API 发送请求并在 API 以 HTTP 状态响应时解析代码 200
并在它以其他状态代码响应时拒绝。但是,当我在数组中有超过 5 个项目时,我冒着 Promise.all()
拒绝的风险。
如何将 Promise.all()
调用限制为每秒 5 次调用?
希望对您有所帮助。
而且还要说这将使用 Promise.all
来解决所有请求,如果您有大量查询,这将等待所有问题都解决并且可能导致您的代码等待很多所有回应。
而且,如果其中一个请求被拒绝,Promise.all
将被拒绝。
我建议,如果您不需要所有结果,最好使用其他工具,例如 lodash debounce or throttle 或处理此问题的框架。
let items = [
{name: 'item1'},
{name: 'item2'},
{name: 'item3'},
{name: 'item4'},
{name: 'item5'},
{name: 'item6'}
];
// This is the api request that you send and return a promise
function apiCall(item) {
return new Promise((resolve) => {
setTimeout(() => resolve(item.name), 1000);
})
}
new Promise((resolve) => {
let results = [];
function sendReq (itemsList, iterate, apiCall) {
setTimeout(() => {
// slice itemsList to send request according to the api limit
let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
result = slicedArray.map(item => apiCall(item));
results = [...results, ...result];
// This will resolve the promise when reaches to the last iteration
if (iterate === Math.ceil(items.length / 5) - 1) {
resolve(results);
}
}, (1000 * iterate)); // every 1000ms runs (api limit of one second)
}
// This will make iteration to split array (requests) to chunks of five items
for (i = 0; i < Math.ceil(items.length / 5); i++) {
sendReq(items, i, apiCall);
}
}).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required
如果您不太担心按顺序解决承诺,您可以使用 bluebird 中的并发选项。
下面一次只能处理 5 个查询。
const Promise = require('bluebird');
const buildQueries = (count) => {
let queries = [];
for(let i = 0; i < count; i++) {
queries.push({user: i});
};
return queries;
};
const apiCall = (item) => {
return new Promise(async (resolve, reject) => {
await Promise.delay(1000);
resolve(item.user);
});
};
const queries = buildQueries(20);
Promise.map(queries, async query => {
console.log( await apiCall(query) );
}, {concurrency: 5});
在没有库的情况下使用 ES6
export async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}
export function split(arr, n) {
var res = [];
while (arr.length) {
res.push(arr.splice(0, n));
}
return res;
}
export const delayMS = (t = 200) => {
return new Promise(resolve => {
setTimeout(() => {
resolve(t);
}, t);
});
};
export const throttledPromises = (
asyncFunction,
items = [],
batchSize = 1,
delay = 0
) => {
return new Promise(async (resolve, reject) => {
const output = [];
const batches= split(items, batchSize);
await asyncForEach(batches, async (batch) => {
const promises = batch.map(asyncFunction).map(p => p.catch(reject));
const results = await Promise.all(promises);
output.push(...results);
await delayMS(delay);
});
resolve(output);
});
};
我想你可以把你的问题分成两部分:同时调用不超过 5 个,并确保最新的调用在最旧的调用 1 秒之后才发生。
第一部分很容易用一个很棒的 p-limit 库来解决——它有迄今为止我见过的最简单的界面。
对于第二部分,您需要实际跟踪每个呼叫何时开始 - 即实现等待功能: 基本pseudo-code,未测试:
import pLimit from 'p-limit';
const apiLimit = pLimit(5);
const startTimes = [];
async function rateLimiter(item) {
const lastSecond = (new Date().getTime()) - 1000;
if (startTimes.filter(v => v > lastSecond).length >= 5) {
await new Promise(r => setTimeout(r, 1000));
}
// TODO: cleanup startTimes to avoid memory leak
startTimes.push(new Date().getTime());
return apiCall(item);
}
await Promise.all(items.map(v => apiLimit(() => rateLimiter(v))))
我们可以使用生成器来发送组中的承诺列表。 一旦解决了第一个 yield,我们就可以进行另一个 yield。 我们将结果存储在一个数组中。 一旦 promiseArray 长度等于结果长度,我们就可以解决 包裹的承诺。
const fetch = require("isomorphic-fetch");
const totalPromiseLength = 5;
const requestMethod = url => () => fetch(url).then(response => response.json());
let promiseArray = [...new Array(totalPromiseLength).keys()].map(index =>
requestMethod("https://jsonplaceholder.typicode.com/todos/" + (index + 1))
);
function* chunks(arr, limit) {
for (let i = 0; i < Math.ceil(arr.length / limit); ++i) {
yield [...arr].slice(i * limit, i * limit + limit);
}
}
new Promise(async resolve => {
let generated = chunks(promiseArray, 2);
let result = [];
for (let bla of generated) {
await Promise.all(bla.map(param => param())).then(response => {
result = [...result, ...response];
if (result.length === promiseArray.length) {
resolve(result);
}
});
}
}).then(response => {
console.log(response);
});
也许我头脑简单,但我写的这个版本只是将传入数组分成 5 个承诺的块,并在每个块上执行 Promise.all()
:
utility.throttledPromiseAll = async (promises) => {
const MAX_IN_PROCESS = 5;
const results = new Array(promises.length);
async function doBlock(startIndex) {
// Shallow-copy a block of promises to work on
const currBlock = promises.slice(startIndex, startIndex + MAX_IN_PROCESS);
// Await the completion. If any fail, it will throw and that's good.
const blockResults = await Promise.all(currBlock);
// Assuming all succeeded, copy the results into the results array
for (let ix = 0; ix < blockResults.length; ix++) {
results[ix + startIndex] = blockResults[ix];
}
}
for (let iBlock = 0; iBlock < promises.length; iBlock += MAX_IN_PROCESS) {
await doBlock(iBlock);
}
return results;
};