Promise 队列中的 Bluebird 队列并发未按预期工作

Bluebird-Queue Concurrency in Promise Queues not working as expected

我正在通过 NodeJS 使用 bluebird-queue 将 HTTP 端点作为任务排队。每个任务都有一个 3 级 Promise 依赖关系,必须在完成之前解决。

一个任务

GET -> endpoint 1 // returns promise
    GET -> other endpoints in async // returns promise
        POST -> final endpoint // return promise

我使用 queue.add() 将 20,000 个这些任务放入蓝鸟队列,然后调用 queue.start()。所有错误都被捕获并且处理程序解决 Promise 因此任务可以完成。

我已将并发设置为 50。我最初的期望是队列将在任何给定时间处理 50 个,但它会等待前 50 个完成,然后再开始下一个 50 个。

不幸的是,其中一些请求最多可能需要 10 秒才能完成 - 如果单个请求需要更长的时间才能完成,则整个队列都会停止,直到 Promise 解决。

如果这是预期的行为,我可以如何 do/use 确保队列中的任务在任何给定时间最多处理 50 个任务,而不是一次处理 50 个任务?

这是我的配置设置:

var Queue = require('bluebird-queue'),
    queue = new Queue({
        concurrency: 50, 
        delay: 10, // ms
        interval: 1 // ms not quite sure what this means
    });

感谢任何帮助。

通读 bluebird-queue 的代码后,很明显您看到的有关并发的行为是预期的。我同意这有点令人惊讶。似乎没有任何方法可以获得所需的行为。

我建议尝试 promise-queue。根据对代码的快速阅读,它似乎会像您预期的那样工作。

您应该 post bluebird-queue 的一个问题,它不是由 Petka Antonov 编写的。这是一个自定义项目,目前还很原始。我很想玩它,因为你的问题听起来很有趣。我做了一个例子(如下),它产生了不同时间的工人来解决。它表明 bluebird-queue 的行为不一致。除了最后一组项目(如您所述)之外的所有项目都在一组中排队,所有下一组仅在上一组完成后才开始。但是,最后一组项目并非如此。您可以使用 Nconcurrency 来追踪

var Promise = require("bluebird");
var Queue = require('bluebird-queue');
function formatTime(date){
    function pad(value, width){
        width = width || 2;
        if(typeof value !== 'string') value = value.toString();
        if(value.length < width) value = new Array(width - value.length + 1).join('0') + value;
        return value;
    }
    return pad(date.getHours()) + ':' + pad(date.getMinutes()) + ':' + pad(date.getSeconds()) + ':' + pad(date.getMilliseconds(), 3);
}

var N = 20;
var start = function(){
    var queue = new Queue({
        concurrency: 4,
        delay: 10, // ms
        interval: 1 // ms not quite sure what this means
    });
    for(var i = 0; i < N; i++){
        var worker = function(number) {
            return new Promise(function (resolve, reject) {
                console.log(formatTime(new Date()) + ' Starting ' + number);
                setTimeout(function () {
                    console.log(formatTime(new Date()) + '   Finished ' + number);
                    resolve(number);
                }, 500 + (number % 2)*500);
            });
        }.bind(null, i);
        queue.add(worker);
    }
    queue.start().then(function(arg){
        console.log('All finished ' + arg);
    });
};