异步迭代 JavaScript 中的大量数组而不触发超出堆栈大小

asynchronously iterate over massive array in JavaScript without triggering stack size exceeded

我的环境是 NodeJS,虽然这也可能是一个与网络相关的问题。我有大量来自数据库的数据,我正试图对其进行枚举。但是,为了争论起见,假设我有一个包含 20,000 个字符串的数组:

var y = 'strstrstrstrstrstrstrstrstrstr';
var x = [];
for(var i = 0; i < 20000; i++)
  x.push(y);

我想异步枚举这个列表,假设使用 async library,并且假设因为我非常谨慎,我什至将我的枚举一次限制为 5 次迭代:

var allDone = function() { console.log('done!') };
require('async').eachLimit(x, 5, function(item, cb){
  ...
  someAsyncCall(.., cb);
}, allDone);

预期 x 的 5 个项目将在上面并发迭代,最终将迭代所有 20,000 个项目并且控制台将打印 'done!'。实际发生的是:

Uncaught exception: [RangeError: Maximum call stack size exceeded]

此时我假设这一定是异步库的某种错误,所以我编写了自己的 eachLimit 版本如下:

function eachLimit(data, limit, iterator, cb) {
    var consumed = 0;
    var consume;
    var finished = false;
    consume = function() {
        if(!finished && consumed >= data.length) {
            finished = true;
            cb();
        }else if(!finished) {
            return iterator(data[consumed++], consume);
        }
    };
    var concurrent = limit > data.length ? data.length : limit;
    for(var i = 0; i < concurrent; i++)
        consume();
}

有趣的是,这解决了我的问题。但是当我将我的实验从 nodeJS 移到 Chrome 时,即使使用上面的解决方案,我仍然收到超出的堆栈大小。

很明显,我的方法不会将堆栈增加到异步中包含的 eachLimit 方法那么大。但是,我仍然认为我的方法不好,因为可能不适用于 20k 项,但对于某些大小的数组,我仍然可以使用我的方法超过堆栈大小。我觉得我需要使用尾递归设计某种解决方案来解决这个问题,但我不确定 v8 是否会针对这种情况进行优化,或者是否有可能考虑到这个问题。

您考虑过为此使用 promise 吗?他们应该解决不断增加的堆栈的问题(而且你还可以使用承诺,这在我的书中是一个很大的优势):

// Here, iterator() should take a single data value as input and return
// a promise for the asynchronous behavior (if it is asynchronous)
// or any value if it is synchronous
function eachLimit(data, limit, iterator) {
    return Promise(function (resolve, reject) {
        var i = 0;
        var failed = false;

        function handleFailure(error) {
            failed = true;
            reject(error);
        }

        function queueAction() {
            try {
                Promise.when(iterator(data[i]))
                .then(handleSuccess, handleFailure);
            } catch (error) {
                reject(error);
            }
        }

        function handleSuccess() {
            if (!failed) {
                if (i < data.length) {
                    queueAction();
                    i += 1;
                } else {
                    resolve();
                }
            }
        }

        for (; i < data.length && i < limit; i += 1) {
            queueAction();
        }
    });
}

I feel like I need to design some sort of solution to this problem using tail recursion, but I'm not sure if v8 will even optimize for this case, or if it's possible given the problem.

您正在使用的连续传递样式已经是尾递归的(或接近尾递归)。问题是大多数 JS 引擎确实倾向于在这种情况下进行 Whosebugs。

有两种主要方法可以解决此问题:

1) 使用 setTimeout 强制代码异步。

您的代码发生的情况是您在原始函数 return 之前调用了 return 回调。在某些异步库中,这最终会导致计算器溢出。一个简单的解决方法是通过将回调包装在 setTimeout 中,仅在事件处理循环的下一次迭代中强制回调到 运行。翻译

//Turns out this was actually "someSyncCall"...
someAsyncCall(.., cb);

进入

someAsyncCall(..., function(){
    setTimeout(cb, 0)
});

这里的主要优点是操作起来非常简单。缺点是这会给你的循环增加一些延迟,因为 setTimeout 的实现使得回调总是有一些非零延迟(即使你将它设置为零)。在服务器上你也可以使用 nextTick(或类似的东西,忘了确切的名字)来做类似的事情。

也就是说,拥有大量顺序异步操作循环已经有点奇怪了。如果您的操作实际上都是异步的,那么由于网络延迟,它需要数年才能完成。

2) 使用 trampolining 处理同步代码。

100% 避免 Whosebug 的唯一方法是使用真正的 while 循环。有了 promises,编写以下伪代码会更容易一些:

//vastly incomplete pseudocode
function loopStartingFrom(array, i){
    for(;i<array.length; i++){
        var x = run_next_item(i);
        if(is_promise(x)){
            return x.then(function(){
                loopStartingFrom(array, i+1)
            });
        }
    }
}

基本上,您 运行 您在实际循环中的循环,使用某种方法来检测您的迭代之一是立即 returning 还是推迟到异步计算。当事情 return 立即保持循环 运行ning 并且当你最终获得真正的异步结果时,你停止循环并在异步迭代结果完成时恢复它。

使用蹦床的缺点是它有点复杂。也就是说,有一些异步库可以保证不会发生 Whosebug(通过使用我在幕后提到的两个技巧之一)。

为了防止堆栈溢出,您需要避免 consume 递归到自身。您可以使用一个简单的标志来做到这一点:

function eachLimit(data, limit, iterator, cb) {
    var consumed = 0,
        running = 0,
        isAsync = true;
    function consume() {
        running--;
        if (!isAsync)
            return;
        while (running < limit && consumed < data.length) {
            isAsync = false;
            running++;
            iterator(data[consumed++], consume);
            isAsync = true;
        }
        if (running == 0)
            cb();
    }
    running++;
    consume();
}