限制在给定时间打开的承诺数量
Throttle amount of promises open at a given time
以下 TypeScript 一次执行对 doSomething(action)
的每个调用。 (这意味着列表中的第二项在第一项完成之前不会进行调用)。
async performActionsOneAtATime() {
for (let action of listOfActions) {
const actionResult = await doSomethingOnServer(action);
console.log(`Action Done: ${actionResult}`);
}
}
这个会立即将所有请求发送到服务器(不等待任何响应):
async performActionsInParallel() {
for (let action of listOfActions) {
const actionResultPromise = doSomething(action);
actionResultPromise.then((actionResult) => {
console.log(`Action Done: ${actionResult}`);
});
}
}
但我真正需要的是一种抑制它们的方法。一次可能有 10 或 20 个呼叫打开。 (一次一个太慢了,但是全部 600 个会使服务器过载。)
但我很难弄清楚这一点。
关于如何限制一次打开 X 的调用次数有什么建议吗?
(此问题使用 TypeScript,但我可以使用 ES6 JavaScript 答案。)
您可以使用发布-订阅模式来执行此操作。我也不熟悉 typescipt,我不知道这是在浏览器中还是在后端发生的。我将为此编写伪代码(假设它是后端):
//I'm assuming required packages are included e.g. events = require("events");
let limit = 10;
let emitter = new events.EventEmitter();
for(let i=0; i<limit; i++){
fetchNext(listOfActions.pop());
}
function fetchNext(action){
const actionResultPromise = doSomething(action);
actionResultPromise.then((actionResult) => {
console.log(`Action Done: ${actionResult}`);
emitter.emit('grabTheNextOne', listOfActions.pop());
});
}
emitter.on('grabTheNextOne', fetchNext);
如果您在 Node 中工作,EventEmitter 是 NodeJS 的一部分。如果在浏览器中,您可以使用普通事件模型。这里的关键思想是发布-订阅模式。
没有为此内置的任何东西,因此您必须自己构建。 AFAIK,目前还没有这方面的图书馆。
首先,从 "deferral" 开始——一个允许外部代码解析它的承诺:
class Deferral<T> {
constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
promise: Promise<T>;
resolve: (thenableOrResult?: T | PromiseLike<T>) => void;
reject: (error: any) => void;
}
那么可以定义一个"wait queue",代表所有等待进入临界区的代码块:
class WaitQueue<T> {
private deferrals: Deferral<T>[];
constructor() {
this.deferrals = [];
}
get isEmpty(): boolean {
return this.deferrals.length === 0;
}
enqueue(): Promise<T> {
const deferral = new Deferral<T>();
this.deferrals.push(deferral);
return deferral.promise;
}
dequeue(result?: T) {
const deferral = this.deferrals.shift();
deferral.resolve(result);
}
}
最后你可以定义一个异步信号量,像这样:
export class AsyncSemaphore {
private queue: WaitQueue<void>;
private _count: number;
constructor(count: number = 0) {
this.queue = new WaitQueue<void>();
this._count = count;
}
get count(): number { return this._count; }
waitAsync(): Promise<void> {
if (this._count !== 0) {
--this._count;
return Promise.resolve();
}
return this.queue.enqueue();
}
release(value: number = 1) {
while (value !== 0 && !this.queue.isEmpty) {
this.queue.dequeue();
--value;
}
this._count += value;
}
}
用法示例:
async function performActionsInParallel() {
const semaphore = new AsyncSemaphore(10);
const listOfActions = [...];
const promises = listOfActions.map(async (action) => {
await semaphore.waitAsync();
try {
await doSomething(action);
}
finally {
semaphore.release();
}
});
const results = await Promise.all(promises);
}
该方法首先创建节流器,然后立即启动所有异步操作。每个异步操作都会先(异步地)等待信号量空闲,然后执行动作,最后释放信号量(允许另一个进来)。当所有异步操作完成后,将检索所有结果。
警告:此代码 100% 完全未经测试。我一次都没试过。
您可以在一个简短的函数中完成此操作。 (Returns 值按 naomik 的建议排序。谢谢!)
/**
* Performs a list of callable actions (promise factories) so
* that only a limited number of promises are pending at any
* given time.
*
* @param listOfCallableActions An array of callable functions,
* which should return promises.
* @param limit The maximum number of promises to have pending
* at once.
* @returns A Promise that resolves to the full list of values
* when everything is done.
*/
function throttleActions(listOfCallableActions, limit) {
// We'll need to store which is the next promise in the list.
let i = 0;
let resultArray = new Array(listOfCallableActions.length);
// Now define what happens when any of the actions completes.
// Javascript is (mostly) single-threaded, so only one
// completion handler will call at a given time. Because we
// return doNextAction, the Promise chain continues as long as
// there's an action left in the list.
function doNextAction() {
if (i < listOfCallableActions.length) {
// Save the current value of i, so we can put the result
// in the right place
let actionIndex = i++;
let nextAction = listOfCallableActions[actionIndex];
return Promise.resolve(nextAction()).then(result => {
// Save results to the correct array index.
resultArray[actionIndex] = result;
}).then(doNextAction);
}
}
// Now start up the original <limit> number of promises.
// i advances in calls to doNextAction.
let listOfPromises = [];
while (i < limit && i < listOfCallableActions.length) {
listOfPromises.push(doNextAction());
}
return Promise.all(listOfPromises).then(() => resultArray);
}
// Test harness:
function delay(name, ms) {
return new Promise((resolve, reject) => setTimeout(() => {
console.log(name);
resolve(name);
}, ms));
}
var ps = [];
for (let i = 0; i < 10; i++) {
ps.push(() => delay("promise " + i, Math.random() * 3000));
}
throttleActions(ps, 3).then(result => console.log(result));
编辑
Jeff Bowman 大大改进了他解决有意义的值的答案。随意查看此答案的历史记录,以了解为什么已解析的值如此 important/useful.
节流
此解决方案非常模仿本机 Promise.all
怎么一样……
- 尽快解决承诺
- 按照与输入相同的顺序解析值数组
- 一遇到拒绝就拒绝
有何不同……
- number参数限制同时-运行ning Promises的数量
- 数组输入接受 promise creators (thunks);不是实际的承诺
// throttlep :: Number -> [(* -> Promise)]
const throttlep = n=> Ps=>
new Promise ((pass, fail)=> {
// r is the number of promises, xs is final resolved value
let r = Ps.length, xs = []
// decrement r, save the resolved value in position i, run the next promise
let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++))
// if r is 0, we can resolve the final value xs, otherwise chain next
let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail)
// initialize by running the first n promises
Ps.slice(0,n).forEach(run)
})
// -----------------------------------------------------
// make sure it works
// delay :: (String, Number) -> (* -> Promise)
const delay = (id, ms)=>
new Promise (pass=> {
console.log (`running: ${id}`)
setTimeout(pass, ms, id)
})
// ps :: [(* -> Promise)]
let ps = new Array(10)
for (let i = 0; i < 10; i++) {
ps[i] = () => delay(i, Math.random() * 3000)
}
// run a limit of 3 promises in parallel
// the first error will reject the entire pool
throttlep (3) (ps) .then (
xs => console.log ('result:', xs),
err=> console.log ('error:', err.message)
)
控制台输出
输入顺序为运行;解析的结果与输入的顺序相同
running: 0
running: 1
running: 2
=> Promise {}
running: 3
running: 4
running: 5
running: 6
running: 7
running: 8
running: 9
result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
实际使用
让我们看一个更实用的代码示例。此代码的任务是从服务器获取一组图像。这就是我们如何使用 throttlep
将同时请求的数量限制为一次 3
// getImage :: String -> Promise<base64>
let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler)
// actions :: [(* -> Promise<base64>)]
let actions = [
()=> getImage('one.jpg'),
()=> getImage('two.jpg'),
()=> getImage('three.jpg'),
()=> getImage('four.jpg'),
()=> getImage('five.jpg')
]
// throttle the actions then do something...
throttlep (3) (actions) .then(results => {
// results are guaranteed to be ordered the same as the input array
console.log(results)
// [<base64>, <base64>, <base64>, <base64>, <base64>]
})
可以使用生成器限制 Promise。在下面的示例中,我们正在限制它们,以便
function asyncTask(duration = 1000) {
return new Promise(resolve => {
setTimeout(resolve, duration, duration)
})
}
async function main() {
const items = Array(10).fill(() => asyncTask()) {
const generator = batchThrottle(3, ...items)
console.log('batch', (await generator.next()).value)
for await (let result of generator) {
console.log('remaining batch', result)
}
}
{
const generator = streamThrottle(3, ...items)
console.log('stream', await generator.next())
for await (let result of generator) {
console.log('remaining stream', result)
}
}
}
async function* batchThrottle(n = 5, ...items) {
while (items.length) {
const tasks = items.splice(0, n).map(fn => fn())
yield Promise.all(tasks)
}
}
async function* streamThrottle(n = 5, ...items) {
while (items.length) {
const tasks = items.splice(0, n).map(fn => fn())
yield* await Promise.all(tasks)
}
}
main().catch()
这是使用 async
await
语法的节流函数的一个版本:
async function throttle(tasks, max) {
async function run(_, i) {
values[i] = await tasks[i]();
if (max < tasks.length) return run(_, max++);
};
const values = [];
try {
await Promise.all(tasks.slice(0, max).map(run));
} catch (error) {
max = tasks.length; // don't allow new tasks to start
throw error;
}
return values;
}
// Demo
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
const tasks = Array.from({length: 10}, (_, i) =>
async () => {
console.log(`task ${i} starts`);
await delay((1 + i % 3)*1000);
console.log(`task ${i} ends with ${i*10}`);
return i*10;
}
);
throttle(tasks, 4).then(console.log);
这是我使用 TypeScript 的看法:
function ParallelMap<T, U>(array: U[], callbackFn: (element: U, index?: number, array?: U[]) => Promise<T>, maxDegreeOfParallelism: number = -1) {
if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);
return new Promise<T[]>((resolve, reject) => {
const inputArraySize = array.length;
let indexTracker = 0;
let completedTracker = 0;
const output = new Array<T>(inputArraySize);
const errors = new Array<{ index: number, error: any }>();
const processNext = () => {
const elementIndex = indexTracker++;
const element = array[elementIndex];
callbackFn(element, elementIndex, array).then(
value => output[elementIndex] = value,
reason => errors.push({ index: elementIndex, error: reason })
).finally(() => {
++completedTracker;
if (completedTracker == inputArraySize) {
if (errors.length > 0) reject(errors);
else resolve(output);
}
else if (indexTracker < inputArraySize) processNext();
});
};
for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
processNext();
}
});
}
用法:
const maxDegreeOfParallelism = 3; // Number of concurrent tasks
const result = await ParallelMap(
inputArray,
async (value, index, array) => { /* Do something */ }, // Some async function to process each element
maxDegreeOfParallelism
);
与 JavaScript 相同:
function ParallelMap(array, callbackFn, maxDegreeOfParallelism = -1) {
if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);
return new Promise((resolve, reject) => {
const inputArraySize = array.length;
let indexTracker = 0;
let completedTracker = 0;
const output = new Array(inputArraySize);
const errors = new Array();
const processNext = () => {
const elementIndex = indexTracker++;
const element = array[elementIndex];
callbackFn(element, elementIndex, array).then(
value => output[elementIndex] = value,
reason => errors.push({
index: elementIndex,
error: reason
})
).finally(() => {
++completedTracker;
if (completedTracker == inputArraySize) {
if (errors.length > 0) reject(errors);
else resolve(output);
} else if (indexTracker < inputArraySize) processNext();
});
};
for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
processNext();
}
});
}
// Usage
(async() => {
const input = new Array(10).fill(1); // Array containing 10 '1' values
const oneSecondTask = (value, index) => {
return new Promise(resolve => {
setTimeout(() => {
resolve(value + index); // Extremely complex calculation of adding index to value 1
}, 1000);
});
};
console.log(`const input = [${input.join(', ')}];`);
console.log(`---------------------------------------------`);
console.log(`... wait for 10s ...`);
console.log(`---------------------------------------------`);
let start = Date.now();
let maxDegreeOfParallelism = 1;
let result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) one at a time`);
console.log(`---------------------------------------------`);
start = Date.now();
maxDegreeOfParallelism = 2;
result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
console.log(`---------------------------------------------`);
start = Date.now();
maxDegreeOfParallelism = 5;
result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
console.log(`---------------------------------------------`);
start = Date.now();
maxDegreeOfParallelism = 10;
result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
})();
以下 TypeScript 一次执行对 doSomething(action)
的每个调用。 (这意味着列表中的第二项在第一项完成之前不会进行调用)。
async performActionsOneAtATime() {
for (let action of listOfActions) {
const actionResult = await doSomethingOnServer(action);
console.log(`Action Done: ${actionResult}`);
}
}
这个会立即将所有请求发送到服务器(不等待任何响应):
async performActionsInParallel() {
for (let action of listOfActions) {
const actionResultPromise = doSomething(action);
actionResultPromise.then((actionResult) => {
console.log(`Action Done: ${actionResult}`);
});
}
}
但我真正需要的是一种抑制它们的方法。一次可能有 10 或 20 个呼叫打开。 (一次一个太慢了,但是全部 600 个会使服务器过载。)
但我很难弄清楚这一点。
关于如何限制一次打开 X 的调用次数有什么建议吗?
(此问题使用 TypeScript,但我可以使用 ES6 JavaScript 答案。)
您可以使用发布-订阅模式来执行此操作。我也不熟悉 typescipt,我不知道这是在浏览器中还是在后端发生的。我将为此编写伪代码(假设它是后端):
//I'm assuming required packages are included e.g. events = require("events");
let limit = 10;
let emitter = new events.EventEmitter();
for(let i=0; i<limit; i++){
fetchNext(listOfActions.pop());
}
function fetchNext(action){
const actionResultPromise = doSomething(action);
actionResultPromise.then((actionResult) => {
console.log(`Action Done: ${actionResult}`);
emitter.emit('grabTheNextOne', listOfActions.pop());
});
}
emitter.on('grabTheNextOne', fetchNext);
如果您在 Node 中工作,EventEmitter 是 NodeJS 的一部分。如果在浏览器中,您可以使用普通事件模型。这里的关键思想是发布-订阅模式。
没有为此内置的任何东西,因此您必须自己构建。 AFAIK,目前还没有这方面的图书馆。
首先,从 "deferral" 开始——一个允许外部代码解析它的承诺:
class Deferral<T> {
constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
promise: Promise<T>;
resolve: (thenableOrResult?: T | PromiseLike<T>) => void;
reject: (error: any) => void;
}
那么可以定义一个"wait queue",代表所有等待进入临界区的代码块:
class WaitQueue<T> {
private deferrals: Deferral<T>[];
constructor() {
this.deferrals = [];
}
get isEmpty(): boolean {
return this.deferrals.length === 0;
}
enqueue(): Promise<T> {
const deferral = new Deferral<T>();
this.deferrals.push(deferral);
return deferral.promise;
}
dequeue(result?: T) {
const deferral = this.deferrals.shift();
deferral.resolve(result);
}
}
最后你可以定义一个异步信号量,像这样:
export class AsyncSemaphore {
private queue: WaitQueue<void>;
private _count: number;
constructor(count: number = 0) {
this.queue = new WaitQueue<void>();
this._count = count;
}
get count(): number { return this._count; }
waitAsync(): Promise<void> {
if (this._count !== 0) {
--this._count;
return Promise.resolve();
}
return this.queue.enqueue();
}
release(value: number = 1) {
while (value !== 0 && !this.queue.isEmpty) {
this.queue.dequeue();
--value;
}
this._count += value;
}
}
用法示例:
async function performActionsInParallel() {
const semaphore = new AsyncSemaphore(10);
const listOfActions = [...];
const promises = listOfActions.map(async (action) => {
await semaphore.waitAsync();
try {
await doSomething(action);
}
finally {
semaphore.release();
}
});
const results = await Promise.all(promises);
}
该方法首先创建节流器,然后立即启动所有异步操作。每个异步操作都会先(异步地)等待信号量空闲,然后执行动作,最后释放信号量(允许另一个进来)。当所有异步操作完成后,将检索所有结果。
警告:此代码 100% 完全未经测试。我一次都没试过。
您可以在一个简短的函数中完成此操作。 (Returns 值按 naomik 的建议排序。谢谢!)
/**
* Performs a list of callable actions (promise factories) so
* that only a limited number of promises are pending at any
* given time.
*
* @param listOfCallableActions An array of callable functions,
* which should return promises.
* @param limit The maximum number of promises to have pending
* at once.
* @returns A Promise that resolves to the full list of values
* when everything is done.
*/
function throttleActions(listOfCallableActions, limit) {
// We'll need to store which is the next promise in the list.
let i = 0;
let resultArray = new Array(listOfCallableActions.length);
// Now define what happens when any of the actions completes.
// Javascript is (mostly) single-threaded, so only one
// completion handler will call at a given time. Because we
// return doNextAction, the Promise chain continues as long as
// there's an action left in the list.
function doNextAction() {
if (i < listOfCallableActions.length) {
// Save the current value of i, so we can put the result
// in the right place
let actionIndex = i++;
let nextAction = listOfCallableActions[actionIndex];
return Promise.resolve(nextAction()).then(result => {
// Save results to the correct array index.
resultArray[actionIndex] = result;
}).then(doNextAction);
}
}
// Now start up the original <limit> number of promises.
// i advances in calls to doNextAction.
let listOfPromises = [];
while (i < limit && i < listOfCallableActions.length) {
listOfPromises.push(doNextAction());
}
return Promise.all(listOfPromises).then(() => resultArray);
}
// Test harness:
function delay(name, ms) {
return new Promise((resolve, reject) => setTimeout(() => {
console.log(name);
resolve(name);
}, ms));
}
var ps = [];
for (let i = 0; i < 10; i++) {
ps.push(() => delay("promise " + i, Math.random() * 3000));
}
throttleActions(ps, 3).then(result => console.log(result));
编辑
Jeff Bowman 大大改进了他解决有意义的值的答案。随意查看此答案的历史记录,以了解为什么已解析的值如此 important/useful.
节流
此解决方案非常模仿本机 Promise.all
怎么一样……
- 尽快解决承诺
- 按照与输入相同的顺序解析值数组
- 一遇到拒绝就拒绝
有何不同……
- number参数限制同时-运行ning Promises的数量
- 数组输入接受 promise creators (thunks);不是实际的承诺
// throttlep :: Number -> [(* -> Promise)]
const throttlep = n=> Ps=>
new Promise ((pass, fail)=> {
// r is the number of promises, xs is final resolved value
let r = Ps.length, xs = []
// decrement r, save the resolved value in position i, run the next promise
let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++))
// if r is 0, we can resolve the final value xs, otherwise chain next
let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail)
// initialize by running the first n promises
Ps.slice(0,n).forEach(run)
})
// -----------------------------------------------------
// make sure it works
// delay :: (String, Number) -> (* -> Promise)
const delay = (id, ms)=>
new Promise (pass=> {
console.log (`running: ${id}`)
setTimeout(pass, ms, id)
})
// ps :: [(* -> Promise)]
let ps = new Array(10)
for (let i = 0; i < 10; i++) {
ps[i] = () => delay(i, Math.random() * 3000)
}
// run a limit of 3 promises in parallel
// the first error will reject the entire pool
throttlep (3) (ps) .then (
xs => console.log ('result:', xs),
err=> console.log ('error:', err.message)
)
控制台输出
输入顺序为运行;解析的结果与输入的顺序相同
running: 0
running: 1
running: 2
=> Promise {}
running: 3
running: 4
running: 5
running: 6
running: 7
running: 8
running: 9
result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
实际使用
让我们看一个更实用的代码示例。此代码的任务是从服务器获取一组图像。这就是我们如何使用 throttlep
将同时请求的数量限制为一次 3
// getImage :: String -> Promise<base64>
let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler)
// actions :: [(* -> Promise<base64>)]
let actions = [
()=> getImage('one.jpg'),
()=> getImage('two.jpg'),
()=> getImage('three.jpg'),
()=> getImage('four.jpg'),
()=> getImage('five.jpg')
]
// throttle the actions then do something...
throttlep (3) (actions) .then(results => {
// results are guaranteed to be ordered the same as the input array
console.log(results)
// [<base64>, <base64>, <base64>, <base64>, <base64>]
})
可以使用生成器限制 Promise。在下面的示例中,我们正在限制它们,以便
function asyncTask(duration = 1000) {
return new Promise(resolve => {
setTimeout(resolve, duration, duration)
})
}
async function main() {
const items = Array(10).fill(() => asyncTask()) {
const generator = batchThrottle(3, ...items)
console.log('batch', (await generator.next()).value)
for await (let result of generator) {
console.log('remaining batch', result)
}
}
{
const generator = streamThrottle(3, ...items)
console.log('stream', await generator.next())
for await (let result of generator) {
console.log('remaining stream', result)
}
}
}
async function* batchThrottle(n = 5, ...items) {
while (items.length) {
const tasks = items.splice(0, n).map(fn => fn())
yield Promise.all(tasks)
}
}
async function* streamThrottle(n = 5, ...items) {
while (items.length) {
const tasks = items.splice(0, n).map(fn => fn())
yield* await Promise.all(tasks)
}
}
main().catch()
这是使用 async
await
语法的节流函数的一个版本:
async function throttle(tasks, max) {
async function run(_, i) {
values[i] = await tasks[i]();
if (max < tasks.length) return run(_, max++);
};
const values = [];
try {
await Promise.all(tasks.slice(0, max).map(run));
} catch (error) {
max = tasks.length; // don't allow new tasks to start
throw error;
}
return values;
}
// Demo
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
const tasks = Array.from({length: 10}, (_, i) =>
async () => {
console.log(`task ${i} starts`);
await delay((1 + i % 3)*1000);
console.log(`task ${i} ends with ${i*10}`);
return i*10;
}
);
throttle(tasks, 4).then(console.log);
这是我使用 TypeScript 的看法:
function ParallelMap<T, U>(array: U[], callbackFn: (element: U, index?: number, array?: U[]) => Promise<T>, maxDegreeOfParallelism: number = -1) {
if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);
return new Promise<T[]>((resolve, reject) => {
const inputArraySize = array.length;
let indexTracker = 0;
let completedTracker = 0;
const output = new Array<T>(inputArraySize);
const errors = new Array<{ index: number, error: any }>();
const processNext = () => {
const elementIndex = indexTracker++;
const element = array[elementIndex];
callbackFn(element, elementIndex, array).then(
value => output[elementIndex] = value,
reason => errors.push({ index: elementIndex, error: reason })
).finally(() => {
++completedTracker;
if (completedTracker == inputArraySize) {
if (errors.length > 0) reject(errors);
else resolve(output);
}
else if (indexTracker < inputArraySize) processNext();
});
};
for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
processNext();
}
});
}
用法:
const maxDegreeOfParallelism = 3; // Number of concurrent tasks
const result = await ParallelMap(
inputArray,
async (value, index, array) => { /* Do something */ }, // Some async function to process each element
maxDegreeOfParallelism
);
与 JavaScript 相同:
function ParallelMap(array, callbackFn, maxDegreeOfParallelism = -1) {
if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);
return new Promise((resolve, reject) => {
const inputArraySize = array.length;
let indexTracker = 0;
let completedTracker = 0;
const output = new Array(inputArraySize);
const errors = new Array();
const processNext = () => {
const elementIndex = indexTracker++;
const element = array[elementIndex];
callbackFn(element, elementIndex, array).then(
value => output[elementIndex] = value,
reason => errors.push({
index: elementIndex,
error: reason
})
).finally(() => {
++completedTracker;
if (completedTracker == inputArraySize) {
if (errors.length > 0) reject(errors);
else resolve(output);
} else if (indexTracker < inputArraySize) processNext();
});
};
for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
processNext();
}
});
}
// Usage
(async() => {
const input = new Array(10).fill(1); // Array containing 10 '1' values
const oneSecondTask = (value, index) => {
return new Promise(resolve => {
setTimeout(() => {
resolve(value + index); // Extremely complex calculation of adding index to value 1
}, 1000);
});
};
console.log(`const input = [${input.join(', ')}];`);
console.log(`---------------------------------------------`);
console.log(`... wait for 10s ...`);
console.log(`---------------------------------------------`);
let start = Date.now();
let maxDegreeOfParallelism = 1;
let result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) one at a time`);
console.log(`---------------------------------------------`);
start = Date.now();
maxDegreeOfParallelism = 2;
result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
console.log(`---------------------------------------------`);
start = Date.now();
maxDegreeOfParallelism = 5;
result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
console.log(`---------------------------------------------`);
start = Date.now();
maxDegreeOfParallelism = 10;
result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
console.log(`const result = [${result.join(', ')}];`);
console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
})();