并行调用一组承诺,但按顺序解决它们而不等待其他承诺解决
Call an array of promises in parallel, but resolve them in order without waiting for other promises to resolve
我有一组要并行调用但同步解析的承诺。
我编写了这段代码来完成所需的任务,但是,我需要创建自己的对象 QueryablePromise
来包装我可以同步检查以查看其已解决状态的本机 Promise
.
有没有更好的方法来完成这个不需要特殊对象的任务?
Please note. I do not want to use Promise.all
as I don't want to have to wait for all promises to resolve before processing the effects of the promises. And I cannot use async
functions in my code base.
const PROMISE = Symbol('PROMISE')
const tap = fn => x => (fn(x), x)
class QueryablePromise {
resolved = false
rejected = false
fulfilled = false
constructor(fn) {
this[PROMISE] = new Promise(fn)
.then(tap(() => {
this.fulfilled = true
this.resolved = true
}))
.catch(x => {
this.fulfilled = true
this.rejected = true
throw x
})
}
then(fn) {
this[PROMISE].then(fn)
return this
}
catch(fn) {
this[PROMISE].catch(fn)
return this
}
static resolve(x) {
return new QueryablePromise((res) => res(x))
}
static reject(x) {
return new QueryablePromise((_, rej) => rej(x))
}
}
/**
* parallelPromiseSynchronousResolve
*
* Call array of promises in parallel but resolve them in order
*
* @param {Array<QueryablePromise>} promises
* @praram {Array<fn>|fn} array of resolver function or single resolve function
*/
function parallelPromiseSynchronousResolve(promises, resolver) {
let lastResolvedIndex = 0
const resolvePromises = (promise, i) => {
promise.then(tap(x => {
// loop through all the promises starting at the lastResolvedIndex
for (; lastResolvedIndex < promises.length; lastResolvedIndex++) {
// if promise at the current index isn't resolved break the loop
if (!promises[lastResolvedIndex].resolved) {
break
}
// resolve the promise with the correct resolve function
promises[lastResolvedIndex].then(
Array.isArray(resolver)
? resolver[lastResolvedIndex]
: resolver
)
}
}))
}
promises.forEach(resolvePromises)
}
const timedPromise = (delay, label) =>
new QueryablePromise(res =>
setTimeout(() => {
console.log(label)
res(label)
}, delay)
)
parallelPromiseSynchronousResolve([
timedPromise(20, 'called first promise'),
timedPromise(60, 'called second promise'),
timedPromise(40, 'called third promise'),
], [
x => console.log('resolved first promise'),
x => console.log('resolved second promise'),
x => console.log('resolved third promise'),
])
<script src="https://codepen.io/synthet1c/pen/KyQQmL.js"></script>
欢迎任何帮助。
使用 for await...of
循环,如果你已经有了承诺数组,你可以很好地做到这一点:
const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));
(async () => {
const promises = range(5, index => {
const ms = Math.round(Math.random() * 5000);
return delay(ms).then(() => ({ ms, index }));
});
const start = Date.now();
for await (const { ms, index } of promises) {
console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
}
})();
由于您不能使用异步函数,您可以通过使用 Array.prototype.reduce()
将承诺链接在一起并为每个链同步安排回调来模仿 for await...of
的效果:
const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));
const asyncForEach = (array, cb) => array.reduce(
(chain, promise, index) => chain.then(
() => promise
).then(
value => cb(value, index)
),
Promise.resolve()
);
const promises = range(5, index => {
const ms = Math.round(Math.random() * 5000);
return delay(ms).then(() => ms);
});
const start = Date.now();
asyncForEach(promises, (ms, index) => {
console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
});
错误处理
由于 promise 被声明为并行实例化,我假设任何单个 promise 上的错误都不会传播到其他 promise,除非通过 asyncForEach()
构建的任何潜在的脆弱链(如上)。
但我们也希望在 asyncForEach()
中将承诺链接在一起时避免交叉传播错误。这是一种稳健地安排错误回调的方法,其中错误只能从原始承诺传播:
const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const maybe = p => p.then(v => Math.random() < 0.5 ? Promise.reject(v) : v);
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));
const asyncForEach = (array, fulfilled, rejected = () => {}) => array.reduce(
(chain, promise, index) => {
promise.catch(() => {}); // catch early rejection until handled below by chain
return chain.then(
() => promise,
() => promise // catch rejected chain and settle with promise at index
).then(
value => fulfilled(value, index),
error => rejected(error, index)
);
},
Promise.resolve()
);
const promises = range(5, index => {
const ms = Math.round(Math.random() * 5000);
return maybe(delay(ms).then(() => ms)); // promises can fulfill or reject
});
const start = Date.now();
const settled = state => (ms, index) => {
console.log(`index ${index} ${state}ed at ${ms}, consumed at ${Date.now() - start}`);
};
asyncForEach(
promises,
settled('fulfill'),
settled('reject') // indexed callback for rejected state
);
这里唯一需要注意的是,传递给 asyncForEach()
的回调中抛出的任何错误都将被链中的错误处理吞没,除了数组最后一个索引上的回调中抛出的错误.
我建议确实使用 Promise.all
- 但不是一次对所有承诺,而是你希望为每个步骤实现的所有承诺。您可以使用 reduce
:
创建此 "tree list" 的承诺
function parallelPromisesSequentialReduce(promises, reducer, initial) {
return promises.reduce((acc, promise, i) => {
return Promise.all([acc, promise]).then(([prev, res]) => reducer(prev, res, i));
}, Promise.resolve(initial));
}
const timedPromise = (delay, label) => new Promise(resolve =>
setTimeout(() => {
console.log('fulfilled ' + label + ' promise');
resolve(label);
}, delay)
);
parallelPromisesSequentialReduce([
timedPromise(20, 'first'),
timedPromise(60, 'second'),
timedPromise(40, 'third'),
], (acc, res) => {
console.log('combining ' + res + ' promise with previous result (' + acc + ')');
acc.push(res);
return acc;
}, []).then(res => {
console.log('final result', res);
}, console.error);
我有一组要并行调用但同步解析的承诺。
我编写了这段代码来完成所需的任务,但是,我需要创建自己的对象 QueryablePromise
来包装我可以同步检查以查看其已解决状态的本机 Promise
.
有没有更好的方法来完成这个不需要特殊对象的任务?
Please note. I do not want to use
Promise.all
as I don't want to have to wait for all promises to resolve before processing the effects of the promises. And I cannot useasync
functions in my code base.
const PROMISE = Symbol('PROMISE')
const tap = fn => x => (fn(x), x)
class QueryablePromise {
resolved = false
rejected = false
fulfilled = false
constructor(fn) {
this[PROMISE] = new Promise(fn)
.then(tap(() => {
this.fulfilled = true
this.resolved = true
}))
.catch(x => {
this.fulfilled = true
this.rejected = true
throw x
})
}
then(fn) {
this[PROMISE].then(fn)
return this
}
catch(fn) {
this[PROMISE].catch(fn)
return this
}
static resolve(x) {
return new QueryablePromise((res) => res(x))
}
static reject(x) {
return new QueryablePromise((_, rej) => rej(x))
}
}
/**
* parallelPromiseSynchronousResolve
*
* Call array of promises in parallel but resolve them in order
*
* @param {Array<QueryablePromise>} promises
* @praram {Array<fn>|fn} array of resolver function or single resolve function
*/
function parallelPromiseSynchronousResolve(promises, resolver) {
let lastResolvedIndex = 0
const resolvePromises = (promise, i) => {
promise.then(tap(x => {
// loop through all the promises starting at the lastResolvedIndex
for (; lastResolvedIndex < promises.length; lastResolvedIndex++) {
// if promise at the current index isn't resolved break the loop
if (!promises[lastResolvedIndex].resolved) {
break
}
// resolve the promise with the correct resolve function
promises[lastResolvedIndex].then(
Array.isArray(resolver)
? resolver[lastResolvedIndex]
: resolver
)
}
}))
}
promises.forEach(resolvePromises)
}
const timedPromise = (delay, label) =>
new QueryablePromise(res =>
setTimeout(() => {
console.log(label)
res(label)
}, delay)
)
parallelPromiseSynchronousResolve([
timedPromise(20, 'called first promise'),
timedPromise(60, 'called second promise'),
timedPromise(40, 'called third promise'),
], [
x => console.log('resolved first promise'),
x => console.log('resolved second promise'),
x => console.log('resolved third promise'),
])
<script src="https://codepen.io/synthet1c/pen/KyQQmL.js"></script>
欢迎任何帮助。
使用 for await...of
循环,如果你已经有了承诺数组,你可以很好地做到这一点:
const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));
(async () => {
const promises = range(5, index => {
const ms = Math.round(Math.random() * 5000);
return delay(ms).then(() => ({ ms, index }));
});
const start = Date.now();
for await (const { ms, index } of promises) {
console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
}
})();
由于您不能使用异步函数,您可以通过使用 Array.prototype.reduce()
将承诺链接在一起并为每个链同步安排回调来模仿 for await...of
的效果:
const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));
const asyncForEach = (array, cb) => array.reduce(
(chain, promise, index) => chain.then(
() => promise
).then(
value => cb(value, index)
),
Promise.resolve()
);
const promises = range(5, index => {
const ms = Math.round(Math.random() * 5000);
return delay(ms).then(() => ms);
});
const start = Date.now();
asyncForEach(promises, (ms, index) => {
console.log(`index ${index} resolved at ${ms}, consumed at ${Date.now() - start}`);
});
错误处理
由于 promise 被声明为并行实例化,我假设任何单个 promise 上的错误都不会传播到其他 promise,除非通过 asyncForEach()
构建的任何潜在的脆弱链(如上)。
但我们也希望在 asyncForEach()
中将承诺链接在一起时避免交叉传播错误。这是一种稳健地安排错误回调的方法,其中错误只能从原始承诺传播:
const delay = ms => new Promise(resolve => { setTimeout(resolve, ms); });
const maybe = p => p.then(v => Math.random() < 0.5 ? Promise.reject(v) : v);
const range = (length, mapFn) => Array.from({ length }, (_, index) => mapFn(index));
const asyncForEach = (array, fulfilled, rejected = () => {}) => array.reduce(
(chain, promise, index) => {
promise.catch(() => {}); // catch early rejection until handled below by chain
return chain.then(
() => promise,
() => promise // catch rejected chain and settle with promise at index
).then(
value => fulfilled(value, index),
error => rejected(error, index)
);
},
Promise.resolve()
);
const promises = range(5, index => {
const ms = Math.round(Math.random() * 5000);
return maybe(delay(ms).then(() => ms)); // promises can fulfill or reject
});
const start = Date.now();
const settled = state => (ms, index) => {
console.log(`index ${index} ${state}ed at ${ms}, consumed at ${Date.now() - start}`);
};
asyncForEach(
promises,
settled('fulfill'),
settled('reject') // indexed callback for rejected state
);
这里唯一需要注意的是,传递给 asyncForEach()
的回调中抛出的任何错误都将被链中的错误处理吞没,除了数组最后一个索引上的回调中抛出的错误.
我建议确实使用 Promise.all
- 但不是一次对所有承诺,而是你希望为每个步骤实现的所有承诺。您可以使用 reduce
:
function parallelPromisesSequentialReduce(promises, reducer, initial) {
return promises.reduce((acc, promise, i) => {
return Promise.all([acc, promise]).then(([prev, res]) => reducer(prev, res, i));
}, Promise.resolve(initial));
}
const timedPromise = (delay, label) => new Promise(resolve =>
setTimeout(() => {
console.log('fulfilled ' + label + ' promise');
resolve(label);
}, delay)
);
parallelPromisesSequentialReduce([
timedPromise(20, 'first'),
timedPromise(60, 'second'),
timedPromise(40, 'third'),
], (acc, res) => {
console.log('combining ' + res + ' promise with previous result (' + acc + ')');
acc.push(res);
return acc;
}, []).then(res => {
console.log('final result', res);
}, console.error);