连续执行一批承诺。 Promise.all 完成后转到下一批

Execute batch of promises in series. Once Promise.all is done go to the next batch

我有一个包含承诺数组的数组,每个内部数组可以有 4k、2k 或 500 个承诺。

总共有大约 60k 个承诺,我也可以用其他值对其进行测试。

现在我需要执行Promise.all(BigArray[0])

完成第一个内部数组后,我需要执行下一个 Promise.all(BigArray[1]) 依此类推。

如果我尝试执行 Promise.all(BigArray) 它会抛出:

fatal error call_and_retry_2 allocation failed - process out of memory

我需要按顺序执行每个 promise,而不是并行执行,我认为这就是 Node 正在做的事情。我不应该使用新库,但我愿意考虑答案!

编辑:

这是一段代码示例:

function getInfoForEveryInnerArgument(InnerArray) {
    const CPTPromises = _.map(InnerArray, (argument) => getDBInfo(argument));
    return Promise.all(CPTPromises)
        .then((results) => {
            return doSomethingWithResults(results);
        });
}
function mainFunction() {
    BigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    //the summ of all arguments is over 60k...
    const promiseArrayCombination = _.map(BigArray, (InnerArray, key) => getInfoForEveryInnerArgument(InnerArray));

    Promise.all(promiseArrayCombination).then((fullResults) => {
        console.log(fullResults);
        return fullResults;
    })
}

你可以递归地做,例如这里我需要在mongo中放置大约60k个文档,但是它太大了,一步完成,所以我拿了1k个文档,把它们发送到mongo,完成后我又拿了 1k 份文件等

exports.rawRecursive = (arr, start) => {
        //ending condition
        if (start > arr.length) {
            return;
        }

        Rawmedicament.insertManyAsync(_.slice(arr, start, start + 1000)).then(() => {
            //recursive
            exports.rawRecursive(arr, start + 1000);
        });
};

如果你想注意,当一切都完成后,你可以在结束条件下放置回调,或者如果你喜欢 Promises,你可以在那里调用 resolve()。

您的问题名称有点错误,这可能使一些人对此问题和该问题的先前版本感到困惑。您正在尝试连续执行一批异步操作,一批操作,然后在完成后执行另一批操作。这些异步操作的结果通过 promises 进行跟踪。 Promises 本身代表已经开始的异步操作。 "Promises" 不会自己执行。所以从技术上讲,你不需要 "execute a batch of promises in series"。您执行一组操作,使用承诺跟踪它们的结果,然后在第一批全部完成后执行下一批。

无论如何,这里有一个序列化每批操作的解决方案。

您可以创建一个我通常称之为 next() 的内部函数,让您处理每次迭代。当 promise 通过处理一个 innerArray 解决时,您再次调用 next()

function mainFunction() {
    return new Promise(function(resolve, reject) {
        var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
        //the summ of all arguments is over 60k...
        var results = [];

        var index = 0;
        function next() {
            if (index < bigArray.length) {
                getInfoForEveryInnerArgument(bigArray[index++]).then(function(data) {
                    results.push(data);
                    next();
                }, reject);
            } else {
                resolve(results);
            }
        }
        // start first iteration
        next();
    });
}

这还将所有子结果收集到一个结果数组中,returns 已解析值的 master promise 就是这个结果数组。所以,你可以这样使用:

mainFunction().then(function(results) {
    // final results array here and everything done
}, function(err) {
    // some error here
});

您还可以使用 .reduce() 设计模式来连续迭代数组:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return bigArray.reduce(function(p, item) {
        return p.then(function(results) {
            return getInfoForEveryInnerArgument(item).then(function(data) {
                results.push(data);
                return results;
            })
        });
    }, Promise.resolve([]));
}

这比第一个选项创建了更多的同步承诺,我不知道这对于如此大的承诺集是否是一个问题(这就是我提供原始选项的原因),但这段代码更清晰并且概念也很方便用于其他情况。


仅供参考,有一些 promise 附加功能专为您执行此操作而构建。在 Bluebird promise library(这是一个很棒的使用 promises 进行开发的库)中,他们有 Promise.map() 是为此而制作的:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return Promise.map(bigArray, getInfoForEveryInnerArgument);

}

此外,如果原始数组不是promises而是应该处理的对象,批处理可以在没有外部依赖的情况下使用Array.prototype.map(), Array.prototype.slice() and Promise.all():

的组合来完成

// Main batch parallelization function.
function batch(tasks, pstart, atonce, runner, pos) {
  if (!pos) pos = 0;
  if (pos >= tasks.length) return pstart;
  var p = pstart.then(function() {
    output('Batch:', pos / atonce + 1);
    return Promise.all(tasks.slice(pos, pos + atonce).map(function(task) {
      return runner(task);
    }));
  });
  return batch(tasks, p, atonce, runner, pos + atonce);
}

// Output function for the example
function output() {
  document.getElementById("result").innerHTML += Array.prototype.slice.call(arguments).join(' ') + "<br />";
  window.scrollTo(0, document.body.scrollHeight);
}

/*
 * Example code.
 * Note: Task runner should return Promise.
 */
function taskrunner(task) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      output('Processed:', task.text, 'Delay:', task.delay);
      resolve();
    }, task.delay);
  });
}

var taskarray = [];
function populatetasks(size) {
  taskarray = [];
  for (var i = 0; i < size; i++) {
    taskarray.push({
      delay: 500 + Math.ceil(Math.random() * 50) * 10,
      text: 'Item ' + (i + 1)
    });
  }
}

function clean() {
  document.getElementById("result").innerHTML = '';
}

var init = Promise.resolve();
function start() {
  var bsize = parseInt(document.getElementById("batchsize").value, 10),
    tsize = parseInt(document.getElementById("taskssize").value, 10);
  populatetasks(tsize);
  init = batch(taskarray.slice() /*tasks array*/ , init /*starting promise*/ , bsize /*batch size*/ , taskrunner /*task runner*/ );
}
<input type="button" onclick="start()" value="Start" />
<input type="button" onclick="clean()" value="Clear" />&nbsp;Batch size:&nbsp;
<input id="batchsize" value="4" size="2"/>&nbsp;Tasks:&nbsp;
<input id="taskssize" value="10" size="2"/>
<pre id="result" />

@jfriend00 只需使用 async/awaitreduce:

添加您的答案
function runPromisesInSeries(bigArray, getInfoForEveryInnerArgument) {
  try {
    return bigArray.reduce(async (acc, cItem) => {
      const results = await acc
      const data = await getInfoForEveryInnerArgument(cItem)
      results.push(data)
      return results
    }, Promise.resolve([]))
  } catch (err) {
    throw err
  }
}

动态批处理更多承诺

一个简单的实现,您可以将任务队列并行批处理到 运行 并动态添加:

class TaskQueue {
  constructor ({
    makeTask,
    initialData = [],
    getId = data => data.id,
    batchSize = 15,
    onComplete = () => {},
  }) {
    if (!makeTask) throw new Error('The "makeTask" parameter is required');

    this.makeTask = makeTask;
    this.getId = getId;
    this.batchSize = batchSize;
    this.onComplete = onComplete;
    this.queue = new Map();

    this.add(initialData);
  }

  add(...data) {
    data.forEach(item => {
      const id = this.getId(item);
      if (this.queue.has(id)) return;

      this.queue.set(id, item);
    });

    // running automatically on create or additional items added
    this.runNextBatch();
  }

  runNextBatch () {
    if (this.queueStarted) return;
    if (this.queue.size === 0) return;

    this.queueStarted = true;
    const currentBatchData = Array.from(this.queue.values()).slice(0, this.batchSize);

    const tasks = currentBatchData.map(data => {
      const id = this.getId(data);

      // Have some error handling implemented in `makeTask`
      this.makeTask(data)
        .finally(() => this.queue.delete(id));
    });

    return Promise.all(tasks)
      .then(() => {
        this.queueStarted = false;
        this.runNextBatch();
      })
      .finally(() => {
        this.queueStarted = false;
        if (this.queue.size === 0) this.onComplete();
      });
  }
}

// Usage
const lotOfFilesForUpload = [{ uri: 'file://some-path' }, { uri: 'file://some-other-path' }];

const upload = (file) => console.log('fake uploading file: ', file);

const taskQueue = new TaskQueue({
  initialData: lotOfFilesForUpload,
  getId: file => file.uri,
  makeTask: file => upload(file),
  onComplete: () => console.log('Queue completed'),
});

// You can add more tasks dynamically
taskQueue.add({ uri: 'file://yet-another-file' });

2020 年 10 月的回答。Async/await 简而言之:只有 10 行代码+JSDoc。

/**
 * Same as Promise.all(items.map(item => task(item))), but it waits for
 * the first {batchSize} promises to finish before starting the next batch.
 *
 * @template A
 * @template B
 * @param {function(A): B} task The task to run for each item.
 * @param {A[]} items Arguments to pass to the task for each call.
 * @param {int} batchSize
 * @returns {B[]}
 */
async promiseAllInBatches(task, items, batchSize) {
    let position = 0;
    let results = [];
    while (position < items.length) {
        const itemsForBatch = items.slice(position, position + batchSize);
        results = [...results, ...await Promise.all(itemsForBatch.map(item => task(item)))];
        position += batchSize;
    }
    return results;
}