如何在 Node.js 处使用具有指定数量并行处理的 promise.all

How to use promise.all with a specified number of parallel processing at Node.js

我有一个承诺列表:[pr1, pr2, pr3, pr4, pr5]。因为每一个promise都占用大量的资源,所以我想一次,总是,只有指定数量的promise是运行.

我尝试了 Promise.allSettled(),我一次设置了 2 个承诺 运行 (pr1, pr2),但我必须等待 2 个承诺完成才能开始下一个 pr (pr3, pr4) ).我想如果 pr1 很快完成,pr3 将取代 pr1,现在 2 promises 是 (pr2, pr3),总是 2 promises 运行ning.

如果有人有任何解决方案,很高兴听到您的解决方案。 非常感谢。

I have a list of promises: [pr1, pr2, pr3, pr4, pr5]

如果您有 5 个承诺的列表,那么您已经有 5 个并行处理的事情。没有办法阻止它,因为您已经通过创建承诺触发了流程。

如果您只想一次处理其中两个,则需要不创建承诺。因此,您需要的是 return 承诺 的 5 函数列表,而不是 5 个承诺。

您需要的是 [f1, f2, f3, f4, f5] 的数组,其中 f1 将 return pr1f2 将 return pr2

一旦你有了这个,你需要做的就是一次Promise.all()两个承诺:

const tasks = [f1, f2, f3, f4, f5];

const BATCH_IN_PARALLEL = 2;

async function batchTasks() {
    for (let i=0; i<tasks.length;) {
        let promises = [];

        // create two promises at a time:
        for (let j=0; j<BATCH_IN_PARALLEL && i<tasks.length; i++,j++) {
            let t = tasks[i];
            promises.push(t()); // create the promise here!
        }

        await Promise.all(promises); // wait for the two promises
    }
}

如果您需要承诺的结果,只需将它们收集在一个数组中即可:

async function batchTasks() {
    let result = [];

    for (let i=0; i<tasks.length;) {
        let promises = [];

        // create two promises at a time:
        for (let j=0; j<BATCH_IN_PARALLEL && i<tasks.length; i++,j++) {
            let t = tasks[i];
            promises.push(t()); // create the promise here!
        }

        result.push(await Promise.all(promises));
    }

    return result;
}

以上是批处理的基本实现。它一次只处理两个异步函数,但它会等待两个异步函数完成,然后再处理两个。您可以发挥创意并在完成一个功能后立即处理另一个功能,但该功能的代码涉及更多。

async-q 库有一个函数可以做到这一点:asyncq.parallelLimit:

const asyncq = require('async-q');

const tasks = [f1, f2, f3, f4, f5];

let result = asyncq.parallelLimit(tasks, 2);

补充回答

这是我在我的一个项目中发现的一些旧代码,这些代码连续地并行处理两个任务。它使用递归函数来处理任务数组直到为空。如您所见,代码有点复杂,但并不难理解:

function batch (tasks, batch_in_parallel) {
    let len = tasks.length;
    
    return new Promise((resolve, reject)=>{
        let counter = len;

        function looper () {
            if (tasks.length != 0) {
                // remove task from the front of the array:
                // note: alternatively you can use .pop()
                //       to process tasks from the back
                tasks.shift()().then(()=>{
                    counter--;
                    if (counter) { // if we still have tasks
                        looper();  // process another task
                    }
                    else {
                        // if there are no tasks left we are
                        // done so resolve the promise:
                        resolve();
                    }
                });
            }
        }

        // Start parallel tasks:
        for (let i=0; i<batch_in_parallel; i++) {
            looper();
        }
    });
}

// Run two tasks in parallel:
batch([f1, f2, f3, f4, f5], 2).then(console.log('done');

请注意,上述函数没有 return 结果。您可以修改它以将结果收集到数组中,然后通过将结果传递给 resolve(result) return 结果,但要确保结果与任务的顺序相同。

现在我只会使用 asyncq.parallelLimit() 除非我真的不想导入整个 async-q 库或者我的 boss/client 不信任它。