使用 ES6 的 Promise.all() 时限制并发的最佳方法是什么?
What is the best way to limit concurrency when using ES6's Promise.all()?
我有一些代码迭代从数据库中查询的列表,并为该列表中的每个元素发出 HTTP 请求。该列表有时可能是一个相当大的数字(数千),我想确保我没有访问具有数千个并发 HTTP 请求的 Web 服务器。
此代码的缩写版本目前看起来像这样...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
此代码在 Node 4.3.2 上 运行。重申一下,是否可以 Promise.all
进行管理,以便在任何给定时间只有一定数量的 Promise 在进行中?
请注意,Promise.all()
不会触发 promise 开始工作,创建 promise 本身会。
考虑到这一点,一种解决方案是在承诺解决时检查是否应启动新承诺,或者您是否已经达到限制。
不过,这里真的没有必要重新发明轮子。 One library that you could use for this purpose is es6-promise-pool
。来自他们的例子:
var PromisePool = require('es6-promise-pool')
var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}
// The number of promises to process simultaneously.
var concurrency = 3
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool.
var poolPromise = pool.start()
// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})
不要使用 promises 来限制 http 请求,而是使用节点的内置 http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并且具有更多控制限制的附加优势。
agent.maxSockets
By default set to Infinity. Determines how many concurrent sockets the agent can have open per origin. Origin is either a 'host:port' or 'host:port:localAddress' combination.
例如:
var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);
如果向同一来源发出多个请求,将 keepAlive
设置为 true 也可能对您有所帮助(请参阅上面的文档了解更多信息)。
bluebird 的 Promise.map 可以采用并发选项来控制并行的承诺数量 运行。有时它比 .all
更容易,因为您不需要创建 promise 数组。
const Promise = require('bluebird')
function getCounts() {
return Promise.map(users, user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
}, {concurrency: 10}); // <---- at most 10 http requests at a time
}
如果您知道迭代器如何工作以及如何使用它们,您就不需要任何额外的库,因为您自己构建自己的并发性会变得非常容易。让我演示一下:
/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()
// loop over all items with for..of
for (const x of iterator) {
console.log('x:', x)
// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator) {
console.log('y:', y)
}
}
我们可以使用同一个迭代器并在所有 worker 之间共享它。
如果你使用 .entries()
而不是 .values()
你会得到一个带有 [[index, value]]
的二维数组,我将在下面用 2
的并发来演示
const sleep = t => new Promise(rs => setTimeout(rs, t))
async function doWork(iterator) {
for (let [index, item] of iterator) {
await sleep(1000)
console.log(index + ': ' + item)
}
}
const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
// ^--- starts two workers sharing the same iterator
Promise.allSettled(workers).then(() => console.log('done'))
这样做的好处是您可以generator function而不是一下子准备好所有东西。
更棒的是,您可以在节点中执行 stream.Readable.from(iterator)
(最终也可以在 whatwg 流中执行)。并且对于可转移的 ReadbleStream,如果您正在与 web workers 一起工作也为了性能,这使得这个潜力在功能中非常有用
注意: 与示例 async-pool 相比的不同之处在于它产生了两个工人,所以如果一个工人由于某种原因在索引 5 处抛出错误它不会阻止其他工人完成剩下的工作。所以你从 2 个并发减少到 1 个。(所以它不会停在那里)所以我的建议是你在 doWork
函数
中捕获所有错误
所以我尝试让一些示例显示适用于我的代码,但由于这仅适用于导入脚本而不是生产代码,因此使用 npm 包 batch-promises 对我来说无疑是最简单的途径
注意:需要运行时支持 Promise 或进行 polyfill。
Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
承诺:每批次后将调用 Iteratee。
使用:
batch-promises
Easily batch promises
NOTE: Requires runtime to support Promise or to be polyfilled.
Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.
Use:
import batchPromises from 'batch-promises';
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
// The iteratee will fire after each batch resulting in the following behaviour:
// @ 100ms resolve items 1 and 2 (first batch of 2)
// @ 200ms resolve items 3 and 4 (second batch of 2)
// @ 300ms resolve remaining item 5 (last remaining batch)
setTimeout(() => {
resolve(i);
}, 100);
}))
.then(results => {
console.log(results); // [1,2,3,4,5]
});
这是我使用 Promise.race
所做的,在我的代码中
const identifyTransactions = async function() {
let promises = []
let concurrency = 0
for (let tx of this.transactions) {
if (concurrency > 4)
await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
promises.push(tx.identifyTransaction())
concurrency++
}
if (promises.length > 0)
await Promise.race(promises) //resolve the rest
}
P-Limit
我将 promise 并发限制与自定义脚本、bluebird、es6-promise-pool 和 p-limit 进行了比较。我相信 p-limit has the most simple, stripped down implementation for this need. See their documentation.
要求
示例中要兼容async
- ECMAScript 2017 (version 8)
- 节点版本 > 8.2.1
我的例子
在这个例子中,我们需要为数组中的每个 URL 运行 一个函数(例如,可能是一个 API 请求)。这里这叫做fetchData()
。如果我们有一个包含数千个项目的数组要处理,并发性肯定有助于节省 CPU 和内存资源。
const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {
// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
});
(async () => {
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
})();
控制台日志结果是一组已解决的承诺响应数据。
这是流媒体和 'p-limit' 的基本示例。它将 http 读取流流式传输到 mongo db。
const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const pipeline = util.promisify(stream.pipeline)
const outputDBConfig = {
dbURL: 'yr-db-url',
collection: 'some-collection'
};
const limit = pLimit(3);
async yrAsyncStreamingFunction(readStream) => {
const mongoWriteStream = streamToMongoDB(outputDBConfig);
const mapperStream = es.map((data, done) => {
let someDataPromise = limit(() => yr_async_call_to_somewhere())
someDataPromise.then(
function handleResolve(someData) {
data.someData = someData;
done(null, data);
},
function handleError(error) {
done(error)
}
);
})
await pipeline(
readStream,
JSONStream.parse('*'),
mapperStream,
mongoWriteStream
);
}
如果你不想使用外部库,递归就是答案
downloadAll(someArrayWithData){
var self = this;
var tracker = function(next){
return self.someExpensiveRequest(someArrayWithData[next])
.then(function(){
next++;//This updates the next in the tracker function parameter
if(next < someArrayWithData.length){//Did I finish processing all my data?
return tracker(next);//Go to the next promise
}
});
}
return tracker(0);
}
可以用递归来解决
想法是,最初您发送允许的最大请求数,并且这些请求中的每一个都应该在完成时递归地继续发送自己。
function batchFetch(urls, concurrentRequestsLimit) {
return new Promise(resolve => {
var documents = [];
var index = 0;
function recursiveFetch() {
if (index === urls.length) {
return;
}
fetch(urls[index++]).then(r => {
documents.push(r.text());
if (documents.length === urls.length) {
resolve(documents);
} else {
recursiveFetch();
}
});
}
for (var i = 0; i < concurrentRequestsLimit; i++) {
recursiveFetch();
}
});
}
var sources = [
'http://www.example_1.com/',
'http://www.example_2.com/',
'http://www.example_3.com/',
...
'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
console.log(documents);
});
我建议使用异步池库:https://github.com/rxaviers/async-pool
npm install tiny-async-pool
描述:
Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7
asyncPool runs multiple promise-returning & async functions in a limited concurrency pool. It rejects immediately as soon as one of the promises rejects. It resolves when all the promises completes. It calls the iterator function as soon as possible (under concurrency limit).
用法:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
使用Array.prototype.splice
while (funcs.length) {
// 100 at a time
await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
- @tcooc的回答很酷。不知道,以后会用到的。
- 我也很喜欢 @MatthewRideout 的回答,但它使用了外部库!!
只要有可能,我都会尝试自己开发这类东西,而不是去图书馆。你最终学到了很多以前看起来令人生畏的概念。
class Pool{
constructor(maxAsync) {
this.maxAsync = maxAsync;
this.asyncOperationsQueue = [];
this.currentAsyncOperations = 0
}
runAnother() {
if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
this.currentAsyncOperations += 1;
this.asyncOperationsQueue.pop()()
.then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
}
}
add(f){ // the argument f is a function of signature () => Promise
this.runAnother();
return new Promise((resolve, reject) => {
this.asyncOperationsQueue.push(
() => f().then(resolve).catch(reject)
)
})
}
}
//#######################################################
// TESTS
//#######################################################
function dbCall(id, timeout, fail) {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (fail) {
reject(`Error for id ${id}`);
} else {
resolve(id);
}
}, timeout)
}
)
}
const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);
const cappedPool = new Pool(2);
const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
这种方法提供了一个很好的 API,类似于 scala/java 中的线程池。
使用 const cappedPool = new Pool(2)
创建池的一个实例后,您只需使用 cappedPool.add(() => myPromise)
.
向其提供承诺
很明显,我们必须确保承诺不会立即开始,这就是为什么我们必须在函数的帮助下“延迟提供”。
最重要的是,请注意方法 add
的结果是一个 Promise,它将 completed/resolved 与您原来的 promise 的值!这使得使用起来非常直观。
const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
// Do something with the result form the DB
}
)
很遗憾,原生无法做到这一点 Promise.all,因此您必须发挥创意。
这是我在不使用任何外部库的情况下找到的最快最简洁的方法。
它利用了一个更新的 javascript 功能,称为迭代器。迭代器基本上跟踪哪些项目已被处理,哪些尚未处理。
为了在代码中使用它,您创建了一个异步函数数组。每个异步函数都向同一个迭代器询问下一个需要处理的项目。每个函数异步处理自己的项,完成后向迭代器请求一个新项。一旦迭代器用完所有项目,所有功能都将完成。
感谢@Endless 的启发。
const items = [
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2'
]
const concurrency = 5
Array(concurrency).fill(items.entries()).map(async (cursor) => {
for (let [index, url] of cursor){
console.log('getting url is ', index, url)
// run your async task instead of this next line
var text = await fetch(url).then(res => res.text())
console.log('text is', text.slice(0, 20))
}
})
这是我的 ES7 解决方案,复制粘贴友好且功能完整 Promise.all()
/map()
替代方案,具有并发限制。
与 Promise.all()
类似,它维护 return 顺序以及非承诺 return 值的回退。
我还对不同的实现进行了比较,因为它说明了一些其他解决方案遗漏的某些方面。
用法
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
实施
async function asyncBatch(args, fn, limit = 8) {
// Copy arguments to avoid side effects
args = [...args];
const outs = [];
while (args.length) {
const batch = args.splice(0, limit);
const out = await Promise.all(batch.map(fn));
outs.push(...out);
}
return outs;
}
async function asyncPool(args, fn, limit = 8) {
return new Promise((resolve) => {
// Copy arguments to avoid side effect, reverse queue as
// pop is faster than shift
const argQueue = [...args].reverse();
let count = 0;
const outs = [];
const pollNext = () => {
if (argQueue.length === 0 && count === 0) {
resolve(outs);
} else {
while (count < limit && argQueue.length) {
const index = args.length - argQueue.length;
const arg = argQueue.pop();
count += 1;
const out = fn(arg);
const processOut = (out, index) => {
outs[index] = out;
count -= 1;
pollNext();
};
if (typeof out === 'object' && out.then) {
out.then(out => processOut(out, index));
} else {
processOut(out, index);
}
}
}
};
pollNext();
});
}
比较
// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
console.log(delay);
resolve(delay);
}, delay));
// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];
// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.
// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms
// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms
// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms
console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3
// Conclusion: Execution order and performance is different,
// but return order is still identical
结论
asyncPool()
应该是最好的解决方案,因为它允许新请求在任何先前的请求完成后立即开始。
asyncBatch()
作为比较包含在内,因为它的实现更容易理解,但它的性能应该较慢,因为需要完成同一批中的所有请求才能开始下一批。
在这个人为的例子中,非限制香草 Promise.all()
当然是最快的,而其他的在现实世界的拥塞场景中表现更理想。
更新
其他人已经建议的异步池库可能是我实现的更好替代方案,因为它的工作方式几乎相同,并且具有更简洁的实现,巧妙地使用了 Promise.race():https://github.com/rxaviers/async-pool/blob/master/lib/es7.js
希望我的回答还能起到教育作用。
这么多好的解决方案。我从@Endless 发布的优雅解决方案开始,并以这个不使用任何外部库的小扩展方法结束,也不分批 运行(尽管假设您具有异步等功能):
Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
await Promise.all(workerThreads);
return results;
};
Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
await Promise.all(workerThreads);
return results;
};
const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
let n = (i + 1) * 5;
setTimeout(() => {
console.log(`Did nothing for ${n} seconds`);
resolve(n);
}, n * 1000);
}));
var results = Promise.allWithLimit(demoTasks);
扩展@deceleratedcaviar 发布的答案,我创建了一个 'batch' 实用程序函数,它作为参数:值数组、并发限制和处理函数。是的,我意识到以这种方式使用 Promise.all 与真正的并发相比更类似于批处理,但如果目标是一次限制过多的 HTTP 调用,我会选择这种方法,因为它简单且不需要外部图书馆。
async function batch(o) {
let arr = o.arr
let resp = []
while (arr.length) {
let subset = arr.splice(0, o.limit)
let results = await Promise.all(subset.map(o.process))
resp.push(results)
}
return [].concat.apply([], resp)
}
let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }
async function calc(val) { return val * 100 }
(async () => {
let resp = await batch({
arr: arr,
limit: 100,
process: calc
})
console.log(resp)
})();
另一种使用自定义承诺库的解决方案 (CPromise):
import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
function* () {
const urls = [
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
];
for (const url of urls) {
yield cpFetch(url); // add a promise to the pool
console.log(`Request [${url}] completed`);
}
},
{ concurrency: 2 }
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
// yeah, we able to cancel the task and abort pending network requests
// setTimeout(() => promise.cancel(), 4500);
import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
[
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
],
{
mapper: (url) => {
console.log(`Request [${url}]`);
return cpFetch(url);
},
concurrency: 2
}
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
// yeah, we able to cancel the task and abort pending network requests
//setTimeout(() => promise.cancel(), 4500);
警告这还没有针对效率进行基准测试并且做了很多数组 copying/creation
如果你想要一个更实用的方法,你可以这样做:
import chunk from 'lodash.chunk';
const maxConcurrency = (max) => (dataArr, promiseFn) =>
chunk(dataArr, max).reduce(
async (agg, batch) => [
...(await agg),
...(await Promise.all(batch.map(promiseFn)))
],
[]
);
然后你可以像这样使用它:
const randomFn = (data) =>
new Promise((res) => setTimeout(
() => res(data + 1),
Math.random() * 1000
));
const result = await maxConcurrency(5)(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
randomFn
);
console.log('result+++', result);
Semaphore 是众所周知的并发原语,旨在解决类似的问题。这是非常通用的构造,信号量的实现存在于多种语言中。这就是使用 Semaphore 解决这个问题的方法:
async function main() {
const s = new Semaphore(100);
const res = await Promise.all(
entities.map((users) =>
s.runExclusive(() => remoteServer.getCount(user))
)
);
return res;
}
我正在使用 async-mutex 的 Semaphore 实现,它有不错的文档和 TypeScript 支持。
如果您想深入研究此类主题,可以看一下“The Little Book of Semaphores”一书,该书可作为 PDF here
免费获取
如果您的目标是放慢 Promise.all 以避免速率限制或过载:
这是我的实现
async function promiseAllGentle(arr, batchSize = 5, sleep = 50) {
let output = [];
while (arr.length) {
const batchResult = await Promise.all(arr.splice(0, batchSize));
output = [...output, ...batchResult];
await new Promise((res) => setTimeout(res, sleep));
}
return output;
}
正如此回答线程中的所有其他人所指出的,如果您需要限制并发性,Promise.all()
将不会做正确的事情。但理想情况下,你甚至不应该 想要 等到 所有 Promise 完成后再处理它们。
相反,您希望在每个结果可用时尽快处理它,这样您就不必等到最后一个承诺完成后再开始迭代它们。
所以,这是一个代码示例,部分基于 and also on 。
async function* raceAsyncIterators(iterators) {
async function queueNext(iteratorResult) {
delete iteratorResult.result; // Release previous result ASAP
iteratorResult.result = await iteratorResult.iterator.next();
return iteratorResult;
};
const iteratorResults = new Map(iterators.map(iterator => [
iterator,
queueNext({ iterator })
]));
while (iteratorResults.size) {
const winner = await Promise.race(iteratorResults.values());
if (winner.result.done) {
iteratorResults.delete(winner.iterator);
} else {
const { value } = winner.result;
iteratorResults.set(winner.iterator, queueNext(winner));
yield value;
}
}
}
async function* runTasks(maxConcurrency, iterator) {
// Each worker is an async generator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
workers[i] = (async function*() {
for (const task of iterator) yield await task();
})();
}
yield* raceAsyncIterators(workers);
}
// example tasks that sleep and return a number
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})()
这里有很多魔法;让我解释一下。
我们从一个 async generator function (an async function*()
) called raceAsyncIterators()
. raceAsyncIterators()
is like Promise.race()
开始,但有 N 个 Promise 迭代器,而不仅仅是 N 个 Promises;它 return 是一个异步迭代器,产生已解决的承诺的结果。
raceAsyncIterators()
保留了一个 iteratorResults
映射,从 iterators
(作为键)映射到 iteratorResult
对象的 Promises;每个 iteratorResult
都有一个 iterator
属性 和一个 result
(等待迭代器的 next()
Promise 的结果)。
raceAsyncIterators()
调用 Promise.race()
让 iteratorResult
Promise 竞相完成它们的任务。当获胜的 iteratorResult
说它的迭代器完全是 done
时,我们将它从 map 中移除;否则,我们将 iteratorResults
映射中的 Promise 替换为迭代器的 next()
Promise 和 yield
result
值。
有了这个,我们现在可以定义我们的 runTasks()
函数。
runTasks()
接受一个 iterator
参数,一个要执行的“任务”的非异步迭代器。每个任务都是一个异步函数(常规 async function()
,而不是异步生成器 async function*()
)。它还接受一个数字 maxConcurrency
,我们将 运行.
的工人数量
// Each worker is an async iterator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
workers[i] = (async function*() {
for (const task of iterator) yield await task();
})();
}
请注意,worker 最初被定义为异步生成器函数,但我们立即调用每个函数,并将每个生成的异步迭代器存储在 workers
数组中。
如果我们只有一个工作迭代器,我们可以调用 for await (let result of worker())
来获得结果流。
但是,由于我们有 N 个 worker 迭代器,我们希望 race 它们与 raceAsyncIterators()
,处理来自哪个 worker 迭代器首先产生结果的结果。
runTasks()
的最后一行是:
yield* raceAsyncIterators(workers)
yield*
是一种不常见的 JS 表达式,其中一个生成器可以产生另一个生成器的结果。此 yield*
行产生赢得比赛的结果。
对于runTasks()
,我们可以使用for await
循环,像这样:
for await (const value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
这 return 每个 Promise 的 value
按照它们被解决的顺序排列。
在示例中,我们生成了一个包含 20 个异步任务的数组,这些任务 sleep
随机时间和 return 一个数字。 (在现实生活中,您可能会创建一个异步函数数组 fetch
URL 或其他东西。)
示例调用 runTasks
并发 3 个工作线程,因此同时启动的任务不超过 3 个。当任何任务完成时,我们立即排队等待下一个任务。 (这优于“批处理”,您一次执行 3 个任务,等待所有三个任务,并且在整个前一批完成之前不要开始下一批三个任务。)
我一直在使用 bottleneck 库,我真的很喜欢它,但在我的情况下并没有释放内存并且一直在处理很长时间的 运行 作业...这不是很好对于 运行 您可能首先需要 throttling/concurrency 库的大量工作。
我需要一个简单、低开销、易于维护的解决方案。我还想要一些东西来保持池满,而不是简单地批处理预定义的块......在下载器的情况下,这将阻止 nGB 文件占用你的队列minutes/hours 一次,尽管其余批次很久以前就完成了。
这是我一直在使用的 Node.js v16+、无依赖、异步生成器解决方案:
const promiseState = function( promise ) {
// A promise could never resolve to a unique symbol unless it was in this scope
const control = Symbol();
// This helps us determine the state of the promise... A little heavy, but it beats a third-party promise library. The control is the second element passed to Promise.race() since it will only resolve first if the promise being tested is pending.
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}
const throttle = async function* ( reservoir, promiseFunction, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseFunction( item ) );
while ( iterable.length > 0 ) {
// When a promise has resolved we have space to top it up to the high water mark...
await Promise.any( iterable );
const pending = [];
const resolved = [];
// This identifies the promise(s) that have resolved so that we can yield them
for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}
// Put the remaining promises back into iterable, and top it to the high water mark
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseFunction( value ) )
];
yield Promise.allSettled( resolved );
}
}
// This is just an example of what would get passed as "promiseFunction"... This can be the function that returns your HTTP request promises
const getTimeout = delay => new Promise( (resolve, reject) => setTimeout(resolve, delay, delay) );
// This is just the async IIFE that bootstraps this example
( async () => {
const test = [ 1000, 2000, 3000, 4000, 5000, 6000, 1500, 2500, 3500, 4500, 5500, 6500 ];
for await ( const timeout of throttle( test, getTimeout, 4 ) ) {
console.log( timeout );
}
} )();
我有创建块和使用 .reduce 函数等待每个块 promise.alls 完成的解决方案。如果承诺有一些调用限制,我也会添加一些延迟。
export function delay(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}
export const chunk = <T>(arr: T[], size: number): T[][] => [
...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));
const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items
await groupedIdList.reduce(async (prev, subIdList) => {
await prev;
// Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
const data = await Promise.all(subIdList.map(myPromise));
await delay(500);
}, Promise.resolve());
此解决方案使用 async generator 来管理普通 javascript 的并发承诺。 throttle
生成器有 3 个参数:
- 要作为参数提供给承诺生成函数的值数组。 (例如,一组 URL。)
- 一个函数,return 一个承诺。 (例如 Returns 对 HTTP 请求的承诺。)
- 表示允许的最大并发承诺的整数。
Promise 仅在需要时实例化以减少内存消耗。可以使用 for await...of 语句迭代结果。
下面的示例提供了一个检查承诺状态的函数、节流异步生成器和一个简单的函数,该函数 return 一个基于 setTimeout. The async IIFE at the end defines the reservoir of timeout values, sets the async iterable return 由 [=11] 编辑的承诺=],然后在结果解析时迭代结果。
如果您想要更完整的 HTTP 请求示例,请在评论中告诉我。
请注意,Node.js 16+ 是必需的 才能订购异步生成器。
const promiseState = function( promise ) {
const control = Symbol();
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}
const throttle = async function* ( reservoir, promiseClass, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );
while ( iterable.length > 0 ) {
await Promise.any( iterable );
const pending = [];
const resolved = [];
for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}
console.log({ pending, resolved, reservoir });
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
];
yield Promise.allSettled( resolved );
}
}
const getTimeout = delay => new Promise( ( resolve, reject ) => {
setTimeout(resolve, delay, delay);
} );
( async () => {
const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];
const throttledRequests = throttle( test, getTimeout, 4 );
for await ( const timeout of throttledRequests ) {
console.log( timeout );
}
} )();
控制最大数量 promises/requests 的一个很好的解决方案是将您的请求列表分成几页,并且一次只生成一页的请求。
下面的示例使用了 iter-ops 库:
import {pipe, toAsync, map, page} from 'iter-ops';
const i = pipe(
toAsync(users), // make it asynchronous
page(10), // split into pages of 10 items in each
map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
wait() // resolve each page in the pipeline
);
// below triggers processing page-by-page:
for await(const p of i) {
//=> p = resolved page of data
}
这样它就不会尝试创建超过一页大小的requests/promises。
使用 tiny-async-pool
ES9 for await...of API,您可以执行以下操作:
const asyncPool = require("tiny-async-pool");
const getCount = async (user) => ([user, remoteServer.getCount(user)]);
const concurrency = 2;
for await (const [user, count] of asyncPool(concurrency, users, getCount)) {
console.log(user, count);
}
上面的 asyncPool 函数 returns 一个异步迭代器,一旦一个承诺完成(在并发限制下)就会产生,并且一旦其中一个承诺被拒绝就会立即拒绝。
可以使用 https://www.npmjs.com/package/job-pipe
限制对服务器的请求
基本上你创建一个管道并告诉它你想要多少个并发请求:
const pipe = createPipe({ throughput: 6, maxQueueSize: Infinity })
然后你使用执行调用的函数并强制它通过管道同时创建有限数量的调用:
const makeCall = async () => {...}
const limitedMakeCall = pipe(makeCall)
最后,您可以根据需要多次调用此方法,就好像它没有发生变化一样,它会限制自己可以处理的并行执行数:
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
....
await limitedMakeCall()
利润。
我有一些代码迭代从数据库中查询的列表,并为该列表中的每个元素发出 HTTP 请求。该列表有时可能是一个相当大的数字(数千),我想确保我没有访问具有数千个并发 HTTP 请求的 Web 服务器。
此代码的缩写版本目前看起来像这样...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
此代码在 Node 4.3.2 上 运行。重申一下,是否可以 Promise.all
进行管理,以便在任何给定时间只有一定数量的 Promise 在进行中?
请注意,Promise.all()
不会触发 promise 开始工作,创建 promise 本身会。
考虑到这一点,一种解决方案是在承诺解决时检查是否应启动新承诺,或者您是否已经达到限制。
不过,这里真的没有必要重新发明轮子。 One library that you could use for this purpose is es6-promise-pool
。来自他们的例子:
var PromisePool = require('es6-promise-pool')
var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}
// The number of promises to process simultaneously.
var concurrency = 3
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool.
var poolPromise = pool.start()
// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})
不要使用 promises 来限制 http 请求,而是使用节点的内置 http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并且具有更多控制限制的附加优势。
agent.maxSockets
By default set to Infinity. Determines how many concurrent sockets the agent can have open per origin. Origin is either a 'host:port' or 'host:port:localAddress' combination.
例如:
var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);
如果向同一来源发出多个请求,将 keepAlive
设置为 true 也可能对您有所帮助(请参阅上面的文档了解更多信息)。
bluebird 的 Promise.map 可以采用并发选项来控制并行的承诺数量 运行。有时它比 .all
更容易,因为您不需要创建 promise 数组。
const Promise = require('bluebird')
function getCounts() {
return Promise.map(users, user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
}, {concurrency: 10}); // <---- at most 10 http requests at a time
}
如果您知道迭代器如何工作以及如何使用它们,您就不需要任何额外的库,因为您自己构建自己的并发性会变得非常容易。让我演示一下:
/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()
// loop over all items with for..of
for (const x of iterator) {
console.log('x:', x)
// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator) {
console.log('y:', y)
}
}
我们可以使用同一个迭代器并在所有 worker 之间共享它。
如果你使用 .entries()
而不是 .values()
你会得到一个带有 [[index, value]]
的二维数组,我将在下面用 2
const sleep = t => new Promise(rs => setTimeout(rs, t))
async function doWork(iterator) {
for (let [index, item] of iterator) {
await sleep(1000)
console.log(index + ': ' + item)
}
}
const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
// ^--- starts two workers sharing the same iterator
Promise.allSettled(workers).then(() => console.log('done'))
这样做的好处是您可以generator function而不是一下子准备好所有东西。
更棒的是,您可以在节点中执行 stream.Readable.from(iterator)
(最终也可以在 whatwg 流中执行)。并且对于可转移的 ReadbleStream,如果您正在与 web workers 一起工作也为了性能,这使得这个潜力在功能中非常有用
注意: 与示例 async-pool 相比的不同之处在于它产生了两个工人,所以如果一个工人由于某种原因在索引 5 处抛出错误它不会阻止其他工人完成剩下的工作。所以你从 2 个并发减少到 1 个。(所以它不会停在那里)所以我的建议是你在 doWork
函数
所以我尝试让一些示例显示适用于我的代码,但由于这仅适用于导入脚本而不是生产代码,因此使用 npm 包 batch-promises 对我来说无疑是最简单的途径
注意:需要运行时支持 Promise 或进行 polyfill。
Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) 承诺:每批次后将调用 Iteratee。
使用:
batch-promises
Easily batch promises
NOTE: Requires runtime to support Promise or to be polyfilled.
Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.
Use:
import batchPromises from 'batch-promises';
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
// The iteratee will fire after each batch resulting in the following behaviour:
// @ 100ms resolve items 1 and 2 (first batch of 2)
// @ 200ms resolve items 3 and 4 (second batch of 2)
// @ 300ms resolve remaining item 5 (last remaining batch)
setTimeout(() => {
resolve(i);
}, 100);
}))
.then(results => {
console.log(results); // [1,2,3,4,5]
});
这是我使用 Promise.race
所做的,在我的代码中
const identifyTransactions = async function() {
let promises = []
let concurrency = 0
for (let tx of this.transactions) {
if (concurrency > 4)
await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
promises.push(tx.identifyTransaction())
concurrency++
}
if (promises.length > 0)
await Promise.race(promises) //resolve the rest
}
P-Limit
我将 promise 并发限制与自定义脚本、bluebird、es6-promise-pool 和 p-limit 进行了比较。我相信 p-limit has the most simple, stripped down implementation for this need. See their documentation.
要求
示例中要兼容async
- ECMAScript 2017 (version 8)
- 节点版本 > 8.2.1
我的例子
在这个例子中,我们需要为数组中的每个 URL 运行 一个函数(例如,可能是一个 API 请求)。这里这叫做fetchData()
。如果我们有一个包含数千个项目的数组要处理,并发性肯定有助于节省 CPU 和内存资源。
const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {
// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
});
(async () => {
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
})();
控制台日志结果是一组已解决的承诺响应数据。
这是流媒体和 'p-limit' 的基本示例。它将 http 读取流流式传输到 mongo db。
const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const pipeline = util.promisify(stream.pipeline)
const outputDBConfig = {
dbURL: 'yr-db-url',
collection: 'some-collection'
};
const limit = pLimit(3);
async yrAsyncStreamingFunction(readStream) => {
const mongoWriteStream = streamToMongoDB(outputDBConfig);
const mapperStream = es.map((data, done) => {
let someDataPromise = limit(() => yr_async_call_to_somewhere())
someDataPromise.then(
function handleResolve(someData) {
data.someData = someData;
done(null, data);
},
function handleError(error) {
done(error)
}
);
})
await pipeline(
readStream,
JSONStream.parse('*'),
mapperStream,
mongoWriteStream
);
}
如果你不想使用外部库,递归就是答案
downloadAll(someArrayWithData){
var self = this;
var tracker = function(next){
return self.someExpensiveRequest(someArrayWithData[next])
.then(function(){
next++;//This updates the next in the tracker function parameter
if(next < someArrayWithData.length){//Did I finish processing all my data?
return tracker(next);//Go to the next promise
}
});
}
return tracker(0);
}
可以用递归来解决
想法是,最初您发送允许的最大请求数,并且这些请求中的每一个都应该在完成时递归地继续发送自己。
function batchFetch(urls, concurrentRequestsLimit) {
return new Promise(resolve => {
var documents = [];
var index = 0;
function recursiveFetch() {
if (index === urls.length) {
return;
}
fetch(urls[index++]).then(r => {
documents.push(r.text());
if (documents.length === urls.length) {
resolve(documents);
} else {
recursiveFetch();
}
});
}
for (var i = 0; i < concurrentRequestsLimit; i++) {
recursiveFetch();
}
});
}
var sources = [
'http://www.example_1.com/',
'http://www.example_2.com/',
'http://www.example_3.com/',
...
'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
console.log(documents);
});
我建议使用异步池库:https://github.com/rxaviers/async-pool
npm install tiny-async-pool
描述:
Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7
asyncPool runs multiple promise-returning & async functions in a limited concurrency pool. It rejects immediately as soon as one of the promises rejects. It resolves when all the promises completes. It calls the iterator function as soon as possible (under concurrency limit).
用法:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
使用Array.prototype.splice
while (funcs.length) {
// 100 at a time
await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
- @tcooc的回答很酷。不知道,以后会用到的。
- 我也很喜欢 @MatthewRideout 的回答,但它使用了外部库!!
只要有可能,我都会尝试自己开发这类东西,而不是去图书馆。你最终学到了很多以前看起来令人生畏的概念。
class Pool{
constructor(maxAsync) {
this.maxAsync = maxAsync;
this.asyncOperationsQueue = [];
this.currentAsyncOperations = 0
}
runAnother() {
if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
this.currentAsyncOperations += 1;
this.asyncOperationsQueue.pop()()
.then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
}
}
add(f){ // the argument f is a function of signature () => Promise
this.runAnother();
return new Promise((resolve, reject) => {
this.asyncOperationsQueue.push(
() => f().then(resolve).catch(reject)
)
})
}
}
//#######################################################
// TESTS
//#######################################################
function dbCall(id, timeout, fail) {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (fail) {
reject(`Error for id ${id}`);
} else {
resolve(id);
}
}, timeout)
}
)
}
const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);
const cappedPool = new Pool(2);
const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
这种方法提供了一个很好的 API,类似于 scala/java 中的线程池。
使用 const cappedPool = new Pool(2)
创建池的一个实例后,您只需使用 cappedPool.add(() => myPromise)
.
向其提供承诺
很明显,我们必须确保承诺不会立即开始,这就是为什么我们必须在函数的帮助下“延迟提供”。
最重要的是,请注意方法 add
的结果是一个 Promise,它将 completed/resolved 与您原来的 promise 的值!这使得使用起来非常直观。
const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
// Do something with the result form the DB
}
)
很遗憾,原生无法做到这一点 Promise.all,因此您必须发挥创意。
这是我在不使用任何外部库的情况下找到的最快最简洁的方法。
它利用了一个更新的 javascript 功能,称为迭代器。迭代器基本上跟踪哪些项目已被处理,哪些尚未处理。
为了在代码中使用它,您创建了一个异步函数数组。每个异步函数都向同一个迭代器询问下一个需要处理的项目。每个函数异步处理自己的项,完成后向迭代器请求一个新项。一旦迭代器用完所有项目,所有功能都将完成。
感谢@Endless 的启发。
const items = [
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2',
'https://httpbin.org/bytes/2'
]
const concurrency = 5
Array(concurrency).fill(items.entries()).map(async (cursor) => {
for (let [index, url] of cursor){
console.log('getting url is ', index, url)
// run your async task instead of this next line
var text = await fetch(url).then(res => res.text())
console.log('text is', text.slice(0, 20))
}
})
这是我的 ES7 解决方案,复制粘贴友好且功能完整 Promise.all()
/map()
替代方案,具有并发限制。
与 Promise.all()
类似,它维护 return 顺序以及非承诺 return 值的回退。
我还对不同的实现进行了比较,因为它说明了一些其他解决方案遗漏的某些方面。
用法
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
实施
async function asyncBatch(args, fn, limit = 8) {
// Copy arguments to avoid side effects
args = [...args];
const outs = [];
while (args.length) {
const batch = args.splice(0, limit);
const out = await Promise.all(batch.map(fn));
outs.push(...out);
}
return outs;
}
async function asyncPool(args, fn, limit = 8) {
return new Promise((resolve) => {
// Copy arguments to avoid side effect, reverse queue as
// pop is faster than shift
const argQueue = [...args].reverse();
let count = 0;
const outs = [];
const pollNext = () => {
if (argQueue.length === 0 && count === 0) {
resolve(outs);
} else {
while (count < limit && argQueue.length) {
const index = args.length - argQueue.length;
const arg = argQueue.pop();
count += 1;
const out = fn(arg);
const processOut = (out, index) => {
outs[index] = out;
count -= 1;
pollNext();
};
if (typeof out === 'object' && out.then) {
out.then(out => processOut(out, index));
} else {
processOut(out, index);
}
}
}
};
pollNext();
});
}
比较
// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
console.log(delay);
resolve(delay);
}, delay));
// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];
// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.
// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms
// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms
// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms
console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3
// Conclusion: Execution order and performance is different,
// but return order is still identical
结论
asyncPool()
应该是最好的解决方案,因为它允许新请求在任何先前的请求完成后立即开始。
asyncBatch()
作为比较包含在内,因为它的实现更容易理解,但它的性能应该较慢,因为需要完成同一批中的所有请求才能开始下一批。
在这个人为的例子中,非限制香草 Promise.all()
当然是最快的,而其他的在现实世界的拥塞场景中表现更理想。
更新
其他人已经建议的异步池库可能是我实现的更好替代方案,因为它的工作方式几乎相同,并且具有更简洁的实现,巧妙地使用了 Promise.race():https://github.com/rxaviers/async-pool/blob/master/lib/es7.js
希望我的回答还能起到教育作用。
这么多好的解决方案。我从@Endless 发布的优雅解决方案开始,并以这个不使用任何外部库的小扩展方法结束,也不分批 运行(尽管假设您具有异步等功能):
Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
await Promise.all(workerThreads);
return results;
};
Promise.allWithLimit = async (taskList, limit = 5) => {
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) => {
try {
let entry = iterator.next();
while (!entry.done) {
let [index, promise] = entry.value;
try {
results[index] = await promise;
entry = iterator.next();
}
catch (err) {
results[index] = err;
}
}
// No more work to do
resolve(true);
}
catch (err) {
// This worker is dead
reject(err);
}
}));
await Promise.all(workerThreads);
return results;
};
const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
let n = (i + 1) * 5;
setTimeout(() => {
console.log(`Did nothing for ${n} seconds`);
resolve(n);
}, n * 1000);
}));
var results = Promise.allWithLimit(demoTasks);
扩展@deceleratedcaviar 发布的答案,我创建了一个 'batch' 实用程序函数,它作为参数:值数组、并发限制和处理函数。是的,我意识到以这种方式使用 Promise.all 与真正的并发相比更类似于批处理,但如果目标是一次限制过多的 HTTP 调用,我会选择这种方法,因为它简单且不需要外部图书馆。
async function batch(o) {
let arr = o.arr
let resp = []
while (arr.length) {
let subset = arr.splice(0, o.limit)
let results = await Promise.all(subset.map(o.process))
resp.push(results)
}
return [].concat.apply([], resp)
}
let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }
async function calc(val) { return val * 100 }
(async () => {
let resp = await batch({
arr: arr,
limit: 100,
process: calc
})
console.log(resp)
})();
另一种使用自定义承诺库的解决方案 (CPromise):
import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
function* () {
const urls = [
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
];
for (const url of urls) {
yield cpFetch(url); // add a promise to the pool
console.log(`Request [${url}] completed`);
}
},
{ concurrency: 2 }
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
// yeah, we able to cancel the task and abort pending network requests
// setTimeout(() => promise.cancel(), 4500);
import { CPromise } from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
[
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
],
{
mapper: (url) => {
console.log(`Request [${url}]`);
return cpFetch(url);
},
concurrency: 2
}
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: ${e}`)
);
// yeah, we able to cancel the task and abort pending network requests
//setTimeout(() => promise.cancel(), 4500);
警告这还没有针对效率进行基准测试并且做了很多数组 copying/creation
如果你想要一个更实用的方法,你可以这样做:
import chunk from 'lodash.chunk';
const maxConcurrency = (max) => (dataArr, promiseFn) =>
chunk(dataArr, max).reduce(
async (agg, batch) => [
...(await agg),
...(await Promise.all(batch.map(promiseFn)))
],
[]
);
然后你可以像这样使用它:
const randomFn = (data) =>
new Promise((res) => setTimeout(
() => res(data + 1),
Math.random() * 1000
));
const result = await maxConcurrency(5)(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
randomFn
);
console.log('result+++', result);
Semaphore 是众所周知的并发原语,旨在解决类似的问题。这是非常通用的构造,信号量的实现存在于多种语言中。这就是使用 Semaphore 解决这个问题的方法:
async function main() {
const s = new Semaphore(100);
const res = await Promise.all(
entities.map((users) =>
s.runExclusive(() => remoteServer.getCount(user))
)
);
return res;
}
我正在使用 async-mutex 的 Semaphore 实现,它有不错的文档和 TypeScript 支持。
如果您想深入研究此类主题,可以看一下“The Little Book of Semaphores”一书,该书可作为 PDF here
免费获取如果您的目标是放慢 Promise.all 以避免速率限制或过载:
这是我的实现
async function promiseAllGentle(arr, batchSize = 5, sleep = 50) {
let output = [];
while (arr.length) {
const batchResult = await Promise.all(arr.splice(0, batchSize));
output = [...output, ...batchResult];
await new Promise((res) => setTimeout(res, sleep));
}
return output;
}
正如此回答线程中的所有其他人所指出的,如果您需要限制并发性,Promise.all()
将不会做正确的事情。但理想情况下,你甚至不应该 想要 等到 所有 Promise 完成后再处理它们。
相反,您希望在每个结果可用时尽快处理它,这样您就不必等到最后一个承诺完成后再开始迭代它们。
所以,这是一个代码示例,部分基于
async function* raceAsyncIterators(iterators) {
async function queueNext(iteratorResult) {
delete iteratorResult.result; // Release previous result ASAP
iteratorResult.result = await iteratorResult.iterator.next();
return iteratorResult;
};
const iteratorResults = new Map(iterators.map(iterator => [
iterator,
queueNext({ iterator })
]));
while (iteratorResults.size) {
const winner = await Promise.race(iteratorResults.values());
if (winner.result.done) {
iteratorResults.delete(winner.iterator);
} else {
const { value } = winner.result;
iteratorResults.set(winner.iterator, queueNext(winner));
yield value;
}
}
}
async function* runTasks(maxConcurrency, iterator) {
// Each worker is an async generator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
workers[i] = (async function*() {
for (const task of iterator) yield await task();
})();
}
yield* raceAsyncIterators(workers);
}
// example tasks that sleep and return a number
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
const tasks = [];
for (let i = 0; i < 20; i++) {
tasks.push(async () => {
console.log(`start ${i}`);
await sleep(Math.random() * 1000);
console.log(`end ${i}`);
return i;
});
}
(async () => {
for await (let value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
})()
这里有很多魔法;让我解释一下。
我们从一个 async generator function (an async function*()
) called raceAsyncIterators()
. raceAsyncIterators()
is like Promise.race()
开始,但有 N 个 Promise 迭代器,而不仅仅是 N 个 Promises;它 return 是一个异步迭代器,产生已解决的承诺的结果。
raceAsyncIterators()
保留了一个 iteratorResults
映射,从 iterators
(作为键)映射到 iteratorResult
对象的 Promises;每个 iteratorResult
都有一个 iterator
属性 和一个 result
(等待迭代器的 next()
Promise 的结果)。
raceAsyncIterators()
调用 Promise.race()
让 iteratorResult
Promise 竞相完成它们的任务。当获胜的 iteratorResult
说它的迭代器完全是 done
时,我们将它从 map 中移除;否则,我们将 iteratorResults
映射中的 Promise 替换为迭代器的 next()
Promise 和 yield
result
值。
有了这个,我们现在可以定义我们的 runTasks()
函数。
runTasks()
接受一个 iterator
参数,一个要执行的“任务”的非异步迭代器。每个任务都是一个异步函数(常规 async function()
,而不是异步生成器 async function*()
)。它还接受一个数字 maxConcurrency
,我们将 运行.
// Each worker is an async iterator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
workers[i] = (async function*() {
for (const task of iterator) yield await task();
})();
}
请注意,worker 最初被定义为异步生成器函数,但我们立即调用每个函数,并将每个生成的异步迭代器存储在 workers
数组中。
如果我们只有一个工作迭代器,我们可以调用 for await (let result of worker())
来获得结果流。
但是,由于我们有 N 个 worker 迭代器,我们希望 race 它们与 raceAsyncIterators()
,处理来自哪个 worker 迭代器首先产生结果的结果。
runTasks()
的最后一行是:
yield* raceAsyncIterators(workers)
yield*
是一种不常见的 JS 表达式,其中一个生成器可以产生另一个生成器的结果。此 yield*
行产生赢得比赛的结果。
对于runTasks()
,我们可以使用for await
循环,像这样:
for await (const value of runTasks(3, tasks.values())) {
console.log(`output ${value}`);
}
这 return 每个 Promise 的 value
按照它们被解决的顺序排列。
在示例中,我们生成了一个包含 20 个异步任务的数组,这些任务 sleep
随机时间和 return 一个数字。 (在现实生活中,您可能会创建一个异步函数数组 fetch
URL 或其他东西。)
示例调用 runTasks
并发 3 个工作线程,因此同时启动的任务不超过 3 个。当任何任务完成时,我们立即排队等待下一个任务。 (这优于“批处理”,您一次执行 3 个任务,等待所有三个任务,并且在整个前一批完成之前不要开始下一批三个任务。)
我一直在使用 bottleneck 库,我真的很喜欢它,但在我的情况下并没有释放内存并且一直在处理很长时间的 运行 作业...这不是很好对于 运行 您可能首先需要 throttling/concurrency 库的大量工作。
我需要一个简单、低开销、易于维护的解决方案。我还想要一些东西来保持池满,而不是简单地批处理预定义的块......在下载器的情况下,这将阻止 nGB 文件占用你的队列minutes/hours 一次,尽管其余批次很久以前就完成了。
这是我一直在使用的 Node.js v16+、无依赖、异步生成器解决方案:
const promiseState = function( promise ) {
// A promise could never resolve to a unique symbol unless it was in this scope
const control = Symbol();
// This helps us determine the state of the promise... A little heavy, but it beats a third-party promise library. The control is the second element passed to Promise.race() since it will only resolve first if the promise being tested is pending.
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}
const throttle = async function* ( reservoir, promiseFunction, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseFunction( item ) );
while ( iterable.length > 0 ) {
// When a promise has resolved we have space to top it up to the high water mark...
await Promise.any( iterable );
const pending = [];
const resolved = [];
// This identifies the promise(s) that have resolved so that we can yield them
for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}
// Put the remaining promises back into iterable, and top it to the high water mark
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseFunction( value ) )
];
yield Promise.allSettled( resolved );
}
}
// This is just an example of what would get passed as "promiseFunction"... This can be the function that returns your HTTP request promises
const getTimeout = delay => new Promise( (resolve, reject) => setTimeout(resolve, delay, delay) );
// This is just the async IIFE that bootstraps this example
( async () => {
const test = [ 1000, 2000, 3000, 4000, 5000, 6000, 1500, 2500, 3500, 4500, 5500, 6500 ];
for await ( const timeout of throttle( test, getTimeout, 4 ) ) {
console.log( timeout );
}
} )();
我有创建块和使用 .reduce 函数等待每个块 promise.alls 完成的解决方案。如果承诺有一些调用限制,我也会添加一些延迟。
export function delay(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}
export const chunk = <T>(arr: T[], size: number): T[][] => [
...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));
const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items
await groupedIdList.reduce(async (prev, subIdList) => {
await prev;
// Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
const data = await Promise.all(subIdList.map(myPromise));
await delay(500);
}, Promise.resolve());
此解决方案使用 async generator 来管理普通 javascript 的并发承诺。 throttle
生成器有 3 个参数:
- 要作为参数提供给承诺生成函数的值数组。 (例如,一组 URL。)
- 一个函数,return 一个承诺。 (例如 Returns 对 HTTP 请求的承诺。)
- 表示允许的最大并发承诺的整数。
Promise 仅在需要时实例化以减少内存消耗。可以使用 for await...of 语句迭代结果。
下面的示例提供了一个检查承诺状态的函数、节流异步生成器和一个简单的函数,该函数 return 一个基于 setTimeout. The async IIFE at the end defines the reservoir of timeout values, sets the async iterable return 由 [=11] 编辑的承诺=],然后在结果解析时迭代结果。
如果您想要更完整的 HTTP 请求示例,请在评论中告诉我。
请注意,Node.js 16+ 是必需的 才能订购异步生成器。
const promiseState = function( promise ) {
const control = Symbol();
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
}
const throttle = async function* ( reservoir, promiseClass, highWaterMark ) {
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );
while ( iterable.length > 0 ) {
await Promise.any( iterable );
const pending = [];
const resolved = [];
for ( const currentValue of iterable ) {
if ( await promiseState( currentValue ) === 'pending' ) {
pending.push( currentValue );
} else {
resolved.push( currentValue );
}
}
console.log({ pending, resolved, reservoir });
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
];
yield Promise.allSettled( resolved );
}
}
const getTimeout = delay => new Promise( ( resolve, reject ) => {
setTimeout(resolve, delay, delay);
} );
( async () => {
const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];
const throttledRequests = throttle( test, getTimeout, 4 );
for await ( const timeout of throttledRequests ) {
console.log( timeout );
}
} )();
控制最大数量 promises/requests 的一个很好的解决方案是将您的请求列表分成几页,并且一次只生成一页的请求。
下面的示例使用了 iter-ops 库:
import {pipe, toAsync, map, page} from 'iter-ops';
const i = pipe(
toAsync(users), // make it asynchronous
page(10), // split into pages of 10 items in each
map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
wait() // resolve each page in the pipeline
);
// below triggers processing page-by-page:
for await(const p of i) {
//=> p = resolved page of data
}
这样它就不会尝试创建超过一页大小的requests/promises。
使用 tiny-async-pool
ES9 for await...of API,您可以执行以下操作:
const asyncPool = require("tiny-async-pool");
const getCount = async (user) => ([user, remoteServer.getCount(user)]);
const concurrency = 2;
for await (const [user, count] of asyncPool(concurrency, users, getCount)) {
console.log(user, count);
}
上面的 asyncPool 函数 returns 一个异步迭代器,一旦一个承诺完成(在并发限制下)就会产生,并且一旦其中一个承诺被拒绝就会立即拒绝。
可以使用 https://www.npmjs.com/package/job-pipe
限制对服务器的请求基本上你创建一个管道并告诉它你想要多少个并发请求:
const pipe = createPipe({ throughput: 6, maxQueueSize: Infinity })
然后你使用执行调用的函数并强制它通过管道同时创建有限数量的调用:
const makeCall = async () => {...}
const limitedMakeCall = pipe(makeCall)
最后,您可以根据需要多次调用此方法,就好像它没有发生变化一样,它会限制自己可以处理的并行执行数:
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
....
await limitedMakeCall()
利润。