连续执行一批承诺。 Promise.all 完成后转到下一批
Execute batch of promises in series. Once Promise.all is done go to the next batch
我有一个包含承诺数组的数组,每个内部数组可以有 4k、2k 或 500 个承诺。
总共有大约 60k 个承诺,我也可以用其他值对其进行测试。
现在我需要执行Promise.all(BigArray[0])
。
完成第一个内部数组后,我需要执行下一个 Promise.all(BigArray[1])
依此类推。
如果我尝试执行 Promise.all(BigArray)
它会抛出:
fatal error call_and_retry_2 allocation failed - process out of memory
我需要按顺序执行每个 promise,而不是并行执行,我认为这就是 Node 正在做的事情。我不应该使用新库,但我愿意考虑答案!
编辑:
这是一段代码示例:
function getInfoForEveryInnerArgument(InnerArray) {
const CPTPromises = _.map(InnerArray, (argument) => getDBInfo(argument));
return Promise.all(CPTPromises)
.then((results) => {
return doSomethingWithResults(results);
});
}
function mainFunction() {
BigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
//the summ of all arguments is over 60k...
const promiseArrayCombination = _.map(BigArray, (InnerArray, key) => getInfoForEveryInnerArgument(InnerArray));
Promise.all(promiseArrayCombination).then((fullResults) => {
console.log(fullResults);
return fullResults;
})
}
你可以递归地做,例如这里我需要在mongo中放置大约60k个文档,但是它太大了,一步完成,所以我拿了1k个文档,把它们发送到mongo,完成后我又拿了 1k 份文件等
exports.rawRecursive = (arr, start) => {
//ending condition
if (start > arr.length) {
return;
}
Rawmedicament.insertManyAsync(_.slice(arr, start, start + 1000)).then(() => {
//recursive
exports.rawRecursive(arr, start + 1000);
});
};
如果你想注意,当一切都完成后,你可以在结束条件下放置回调,或者如果你喜欢 Promises,你可以在那里调用 resolve()。
您的问题名称有点错误,这可能使一些人对此问题和该问题的先前版本感到困惑。您正在尝试连续执行一批异步操作,一批操作,然后在完成后执行另一批操作。这些异步操作的结果通过 promises 进行跟踪。 Promises 本身代表已经开始的异步操作。 "Promises" 不会自己执行。所以从技术上讲,你不需要 "execute a batch of promises in series"。您执行一组操作,使用承诺跟踪它们的结果,然后在第一批全部完成后执行下一批。
无论如何,这里有一个序列化每批操作的解决方案。
您可以创建一个我通常称之为 next()
的内部函数,让您处理每次迭代。当 promise 通过处理一个 innerArray 解决时,您再次调用 next()
:
function mainFunction() {
return new Promise(function(resolve, reject) {
var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
//the summ of all arguments is over 60k...
var results = [];
var index = 0;
function next() {
if (index < bigArray.length) {
getInfoForEveryInnerArgument(bigArray[index++]).then(function(data) {
results.push(data);
next();
}, reject);
} else {
resolve(results);
}
}
// start first iteration
next();
});
}
这还将所有子结果收集到一个结果数组中,returns 已解析值的 master promise 就是这个结果数组。所以,你可以这样使用:
mainFunction().then(function(results) {
// final results array here and everything done
}, function(err) {
// some error here
});
您还可以使用 .reduce()
设计模式来连续迭代数组:
function mainFunction() {
var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
return bigArray.reduce(function(p, item) {
return p.then(function(results) {
return getInfoForEveryInnerArgument(item).then(function(data) {
results.push(data);
return results;
})
});
}, Promise.resolve([]));
}
这比第一个选项创建了更多的同步承诺,我不知道这对于如此大的承诺集是否是一个问题(这就是我提供原始选项的原因),但这段代码更清晰并且概念也很方便用于其他情况。
仅供参考,有一些 promise 附加功能专为您执行此操作而构建。在 Bluebird promise library(这是一个很棒的使用 promises 进行开发的库)中,他们有 Promise.map()
是为此而制作的:
function mainFunction() {
var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
return Promise.map(bigArray, getInfoForEveryInnerArgument);
}
此外,如果原始数组不是promises而是应该处理的对象,批处理可以在没有外部依赖的情况下使用Array.prototype.map()
, Array.prototype.slice()
and Promise.all()
:
的组合来完成
// Main batch parallelization function.
function batch(tasks, pstart, atonce, runner, pos) {
if (!pos) pos = 0;
if (pos >= tasks.length) return pstart;
var p = pstart.then(function() {
output('Batch:', pos / atonce + 1);
return Promise.all(tasks.slice(pos, pos + atonce).map(function(task) {
return runner(task);
}));
});
return batch(tasks, p, atonce, runner, pos + atonce);
}
// Output function for the example
function output() {
document.getElementById("result").innerHTML += Array.prototype.slice.call(arguments).join(' ') + "<br />";
window.scrollTo(0, document.body.scrollHeight);
}
/*
* Example code.
* Note: Task runner should return Promise.
*/
function taskrunner(task) {
return new Promise(function(resolve, reject) {
setTimeout(function() {
output('Processed:', task.text, 'Delay:', task.delay);
resolve();
}, task.delay);
});
}
var taskarray = [];
function populatetasks(size) {
taskarray = [];
for (var i = 0; i < size; i++) {
taskarray.push({
delay: 500 + Math.ceil(Math.random() * 50) * 10,
text: 'Item ' + (i + 1)
});
}
}
function clean() {
document.getElementById("result").innerHTML = '';
}
var init = Promise.resolve();
function start() {
var bsize = parseInt(document.getElementById("batchsize").value, 10),
tsize = parseInt(document.getElementById("taskssize").value, 10);
populatetasks(tsize);
init = batch(taskarray.slice() /*tasks array*/ , init /*starting promise*/ , bsize /*batch size*/ , taskrunner /*task runner*/ );
}
<input type="button" onclick="start()" value="Start" />
<input type="button" onclick="clean()" value="Clear" /> Batch size:
<input id="batchsize" value="4" size="2"/> Tasks:
<input id="taskssize" value="10" size="2"/>
<pre id="result" />
@jfriend00 只需使用 async/await
和 reduce
:
添加您的答案
function runPromisesInSeries(bigArray, getInfoForEveryInnerArgument) {
try {
return bigArray.reduce(async (acc, cItem) => {
const results = await acc
const data = await getInfoForEveryInnerArgument(cItem)
results.push(data)
return results
}, Promise.resolve([]))
} catch (err) {
throw err
}
}
动态批处理更多承诺
一个简单的实现,您可以将任务队列并行批处理到 运行 并动态添加:
class TaskQueue {
constructor ({
makeTask,
initialData = [],
getId = data => data.id,
batchSize = 15,
onComplete = () => {},
}) {
if (!makeTask) throw new Error('The "makeTask" parameter is required');
this.makeTask = makeTask;
this.getId = getId;
this.batchSize = batchSize;
this.onComplete = onComplete;
this.queue = new Map();
this.add(initialData);
}
add(...data) {
data.forEach(item => {
const id = this.getId(item);
if (this.queue.has(id)) return;
this.queue.set(id, item);
});
// running automatically on create or additional items added
this.runNextBatch();
}
runNextBatch () {
if (this.queueStarted) return;
if (this.queue.size === 0) return;
this.queueStarted = true;
const currentBatchData = Array.from(this.queue.values()).slice(0, this.batchSize);
const tasks = currentBatchData.map(data => {
const id = this.getId(data);
// Have some error handling implemented in `makeTask`
this.makeTask(data)
.finally(() => this.queue.delete(id));
});
return Promise.all(tasks)
.then(() => {
this.queueStarted = false;
this.runNextBatch();
})
.finally(() => {
this.queueStarted = false;
if (this.queue.size === 0) this.onComplete();
});
}
}
// Usage
const lotOfFilesForUpload = [{ uri: 'file://some-path' }, { uri: 'file://some-other-path' }];
const upload = (file) => console.log('fake uploading file: ', file);
const taskQueue = new TaskQueue({
initialData: lotOfFilesForUpload,
getId: file => file.uri,
makeTask: file => upload(file),
onComplete: () => console.log('Queue completed'),
});
// You can add more tasks dynamically
taskQueue.add({ uri: 'file://yet-another-file' });
2020 年 10 月的回答。Async/await 简而言之:只有 10 行代码+JSDoc。
/**
* Same as Promise.all(items.map(item => task(item))), but it waits for
* the first {batchSize} promises to finish before starting the next batch.
*
* @template A
* @template B
* @param {function(A): B} task The task to run for each item.
* @param {A[]} items Arguments to pass to the task for each call.
* @param {int} batchSize
* @returns {B[]}
*/
async promiseAllInBatches(task, items, batchSize) {
let position = 0;
let results = [];
while (position < items.length) {
const itemsForBatch = items.slice(position, position + batchSize);
results = [...results, ...await Promise.all(itemsForBatch.map(item => task(item)))];
position += batchSize;
}
return results;
}
我有一个包含承诺数组的数组,每个内部数组可以有 4k、2k 或 500 个承诺。
总共有大约 60k 个承诺,我也可以用其他值对其进行测试。
现在我需要执行Promise.all(BigArray[0])
。
完成第一个内部数组后,我需要执行下一个 Promise.all(BigArray[1])
依此类推。
如果我尝试执行 Promise.all(BigArray)
它会抛出:
fatal error call_and_retry_2 allocation failed - process out of memory
我需要按顺序执行每个 promise,而不是并行执行,我认为这就是 Node 正在做的事情。我不应该使用新库,但我愿意考虑答案!
编辑:
这是一段代码示例:
function getInfoForEveryInnerArgument(InnerArray) {
const CPTPromises = _.map(InnerArray, (argument) => getDBInfo(argument));
return Promise.all(CPTPromises)
.then((results) => {
return doSomethingWithResults(results);
});
}
function mainFunction() {
BigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
//the summ of all arguments is over 60k...
const promiseArrayCombination = _.map(BigArray, (InnerArray, key) => getInfoForEveryInnerArgument(InnerArray));
Promise.all(promiseArrayCombination).then((fullResults) => {
console.log(fullResults);
return fullResults;
})
}
你可以递归地做,例如这里我需要在mongo中放置大约60k个文档,但是它太大了,一步完成,所以我拿了1k个文档,把它们发送到mongo,完成后我又拿了 1k 份文件等
exports.rawRecursive = (arr, start) => {
//ending condition
if (start > arr.length) {
return;
}
Rawmedicament.insertManyAsync(_.slice(arr, start, start + 1000)).then(() => {
//recursive
exports.rawRecursive(arr, start + 1000);
});
};
如果你想注意,当一切都完成后,你可以在结束条件下放置回调,或者如果你喜欢 Promises,你可以在那里调用 resolve()。
您的问题名称有点错误,这可能使一些人对此问题和该问题的先前版本感到困惑。您正在尝试连续执行一批异步操作,一批操作,然后在完成后执行另一批操作。这些异步操作的结果通过 promises 进行跟踪。 Promises 本身代表已经开始的异步操作。 "Promises" 不会自己执行。所以从技术上讲,你不需要 "execute a batch of promises in series"。您执行一组操作,使用承诺跟踪它们的结果,然后在第一批全部完成后执行下一批。
无论如何,这里有一个序列化每批操作的解决方案。
您可以创建一个我通常称之为 next()
的内部函数,让您处理每次迭代。当 promise 通过处理一个 innerArray 解决时,您再次调用 next()
:
function mainFunction() {
return new Promise(function(resolve, reject) {
var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
//the summ of all arguments is over 60k...
var results = [];
var index = 0;
function next() {
if (index < bigArray.length) {
getInfoForEveryInnerArgument(bigArray[index++]).then(function(data) {
results.push(data);
next();
}, reject);
} else {
resolve(results);
}
}
// start first iteration
next();
});
}
这还将所有子结果收集到一个结果数组中,returns 已解析值的 master promise 就是这个结果数组。所以,你可以这样使用:
mainFunction().then(function(results) {
// final results array here and everything done
}, function(err) {
// some error here
});
您还可以使用 .reduce()
设计模式来连续迭代数组:
function mainFunction() {
var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
return bigArray.reduce(function(p, item) {
return p.then(function(results) {
return getInfoForEveryInnerArgument(item).then(function(data) {
results.push(data);
return results;
})
});
}, Promise.resolve([]));
}
这比第一个选项创建了更多的同步承诺,我不知道这对于如此大的承诺集是否是一个问题(这就是我提供原始选项的原因),但这段代码更清晰并且概念也很方便用于其他情况。
仅供参考,有一些 promise 附加功能专为您执行此操作而构建。在 Bluebird promise library(这是一个很棒的使用 promises 进行开发的库)中,他们有 Promise.map()
是为此而制作的:
function mainFunction() {
var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
return Promise.map(bigArray, getInfoForEveryInnerArgument);
}
此外,如果原始数组不是promises而是应该处理的对象,批处理可以在没有外部依赖的情况下使用Array.prototype.map()
, Array.prototype.slice()
and Promise.all()
:
// Main batch parallelization function.
function batch(tasks, pstart, atonce, runner, pos) {
if (!pos) pos = 0;
if (pos >= tasks.length) return pstart;
var p = pstart.then(function() {
output('Batch:', pos / atonce + 1);
return Promise.all(tasks.slice(pos, pos + atonce).map(function(task) {
return runner(task);
}));
});
return batch(tasks, p, atonce, runner, pos + atonce);
}
// Output function for the example
function output() {
document.getElementById("result").innerHTML += Array.prototype.slice.call(arguments).join(' ') + "<br />";
window.scrollTo(0, document.body.scrollHeight);
}
/*
* Example code.
* Note: Task runner should return Promise.
*/
function taskrunner(task) {
return new Promise(function(resolve, reject) {
setTimeout(function() {
output('Processed:', task.text, 'Delay:', task.delay);
resolve();
}, task.delay);
});
}
var taskarray = [];
function populatetasks(size) {
taskarray = [];
for (var i = 0; i < size; i++) {
taskarray.push({
delay: 500 + Math.ceil(Math.random() * 50) * 10,
text: 'Item ' + (i + 1)
});
}
}
function clean() {
document.getElementById("result").innerHTML = '';
}
var init = Promise.resolve();
function start() {
var bsize = parseInt(document.getElementById("batchsize").value, 10),
tsize = parseInt(document.getElementById("taskssize").value, 10);
populatetasks(tsize);
init = batch(taskarray.slice() /*tasks array*/ , init /*starting promise*/ , bsize /*batch size*/ , taskrunner /*task runner*/ );
}
<input type="button" onclick="start()" value="Start" />
<input type="button" onclick="clean()" value="Clear" /> Batch size:
<input id="batchsize" value="4" size="2"/> Tasks:
<input id="taskssize" value="10" size="2"/>
<pre id="result" />
@jfriend00 只需使用 async/await
和 reduce
:
function runPromisesInSeries(bigArray, getInfoForEveryInnerArgument) {
try {
return bigArray.reduce(async (acc, cItem) => {
const results = await acc
const data = await getInfoForEveryInnerArgument(cItem)
results.push(data)
return results
}, Promise.resolve([]))
} catch (err) {
throw err
}
}
动态批处理更多承诺
一个简单的实现,您可以将任务队列并行批处理到 运行 并动态添加:
class TaskQueue {
constructor ({
makeTask,
initialData = [],
getId = data => data.id,
batchSize = 15,
onComplete = () => {},
}) {
if (!makeTask) throw new Error('The "makeTask" parameter is required');
this.makeTask = makeTask;
this.getId = getId;
this.batchSize = batchSize;
this.onComplete = onComplete;
this.queue = new Map();
this.add(initialData);
}
add(...data) {
data.forEach(item => {
const id = this.getId(item);
if (this.queue.has(id)) return;
this.queue.set(id, item);
});
// running automatically on create or additional items added
this.runNextBatch();
}
runNextBatch () {
if (this.queueStarted) return;
if (this.queue.size === 0) return;
this.queueStarted = true;
const currentBatchData = Array.from(this.queue.values()).slice(0, this.batchSize);
const tasks = currentBatchData.map(data => {
const id = this.getId(data);
// Have some error handling implemented in `makeTask`
this.makeTask(data)
.finally(() => this.queue.delete(id));
});
return Promise.all(tasks)
.then(() => {
this.queueStarted = false;
this.runNextBatch();
})
.finally(() => {
this.queueStarted = false;
if (this.queue.size === 0) this.onComplete();
});
}
}
// Usage
const lotOfFilesForUpload = [{ uri: 'file://some-path' }, { uri: 'file://some-other-path' }];
const upload = (file) => console.log('fake uploading file: ', file);
const taskQueue = new TaskQueue({
initialData: lotOfFilesForUpload,
getId: file => file.uri,
makeTask: file => upload(file),
onComplete: () => console.log('Queue completed'),
});
// You can add more tasks dynamically
taskQueue.add({ uri: 'file://yet-another-file' });
2020 年 10 月的回答。Async/await 简而言之:只有 10 行代码+JSDoc。
/**
* Same as Promise.all(items.map(item => task(item))), but it waits for
* the first {batchSize} promises to finish before starting the next batch.
*
* @template A
* @template B
* @param {function(A): B} task The task to run for each item.
* @param {A[]} items Arguments to pass to the task for each call.
* @param {int} batchSize
* @returns {B[]}
*/
async promiseAllInBatches(task, items, batchSize) {
let position = 0;
let results = [];
while (position < items.length) {
const itemsForBatch = items.slice(position, position + batchSize);
results = [...results, ...await Promise.all(itemsForBatch.map(item => task(item)))];
position += batchSize;
}
return results;
}