使用数据库查找在节点中导出大文件 - 避免多次数据库调用?

Exporting large file in node with database lookup - Avoid multiple DB calls?

我是 Node 的新手,虽然我认为它非常适合服务类型的应用程序,但是当我将它用于需要回调时像数据导出应用程序一样简单地从头到尾运行的应用程序时,我遇到了一些问题用于访问数据库或其他类似的东西。

这是我当前的设置。

我有一个脚本可以将数据从 MongoDB 导出到 XML 文件中,以便在单独的进程中使用。导出脚本很简单:

db.getData(function(err, data) {
    data.forEach(function(entry) {
        // write the data to the file
        writeData(entry);
    });
});

问题是当我需要在导出过程中进行非同步调用时,例如:

db.getData(function(err, data) {
    data.forEach(function(entry) {

        var cacheValue = cache.get(entry.someOtherId);

        if (cacheValue) {
            // write the value from the cache
            writeData(entry, cacheValue);
        }

        else {

            // THIS IS CALLED 1000's OF TIMES EVEN THOUGH THE FIRST FEW CALLS
            // SHOULD POPULATE THE CACHE

            db.getLookup(entry.someOtherId, function(err, value) {

                // store it in the cache to avoid db calls
                cache.store(entry.someOtherId, value);

                // write the data to the file after getting the lookup
                writeData(entry, value);
            });
        }
    });
});

由于节点的非阻塞性质,当 getLookup 正在执行时,主 forEach 循环将继续,并且因为 entry.someOtherId 字段是一个查找,通常它会包含与另一条记录相同的值。

所以发生的情况是,对于一个查找量相对较少的大文件,在第一个有机会 return 并存储缓存中的值。

不需要预加载

我知道我可以简单地重新加载缓存,因为查找 table 相当小,但是对于重新缓存所有值不切实际的较大查找,应该如何解决这个问题?

暂停主循环

在同步环境中,这很简单,主循环将停止,直到数据库值被 returned,所以下一次该值已经在缓存中。

我知道有各种库会尝试停止线程执行,直到回调 returns 但这似乎与 Node 的本质背道而驰。

谁能告诉我在 Node 中处理此类情况的普遍接受模式是什么?

我想我现在真正理解了这个表达callback hell

事实证明(这并不奇怪)这一切都需要在回调和递归函数中完成,所以下一个条目在上一个条目完成之前不会开始:

使用此处描述的方法: Using recursive pattern loop with node.js

正在处理一个值数组,该数组与一个索引一起传递给一个函数,当该索引的值已被处理后,它会使用索引 index+1 调用自身:

function processEntry(entries, index, next) {

    // no more entries to run
    if (index >= entries.length) {
        next();
        return;
    }

    var cacheValue = cache.get(entry.someOtherId);

    if (cacheValue) {

        // write the value from the cache
        writeData(entry, cacheValue);

        // process the next entry
        process.nextTick(function() {
            processEntry(entries, index+1, next);
        });
    }

    else {

        db.getLookup(entry.someOtherId, function(err, value) {

            // store it in the cache to avoid db calls
            cache.store(entry.someOtherId, value);

            // write the data to the file after getting the lookup
            writeData(entry, value);

            // process the next entry
            processEntry(entries, index+1, next);
        });
    }

}

避免堆栈溢出

此设置的问题在于,一旦缓存已填充,我们将开始直接从 processEntry 中调用 processEntry,而不是从不同的回调堆栈调用,所以不久我们就会得到一个堆栈溢出。

为了避免这种情况,我们需要告诉 Node 使用 process.nextTick() 创建一个新堆栈 http://nodejs.org/api/process.html#process_process_nexttick_callback

On the next loop around the event loop call this callback. This is not a simple alias to setTimeout(fn, 0), it's much more efficient. It typically runs before any other I/O events fire, but there are some exceptions. See process.maxTickDepth below.

根据文档,此调用相当有效

我建议使用 promise 库和记忆函数来解决并行处理多个异步操作的任务 运行。

对于以下示例,我使用的是 bluebird。您的整个循环,包括结果缓存,都可以简化为这段排列清晰的代码:

var db = Promise.promisifyAll(db);
var lookup = memoize(db.getLookupAsync, db);

entries.forEach(function (entry) {
    lookup(entry.someOtherId).then(function (value) {
        writeData(entry, value);
    });
});

其中 memoize 是缓存函数结果的通用辅助函数:

function memoize(func, thisArg) {
    var cache = {};
    return function memoize(id) {
        if (!cache.hasOwnProperty(id)) {
            cache[id] = func.apply(thisArg || this, arguments);
        }
        return cache[id];
    };
}

所以lookup()是一个调用promisified版本db.getLookup()的函数(bluebird的.promisifyAll()创建了...Async()版本对象中的所有函数)并记住相应的结果。

一个 promisified 函数 returns 一个在数据可用时立即解析(即调用其 .then() 回调)的 promise,或者立即(如果之前已经解析)。换句话说,我们可以缓存一个 promise 并根据需要随时调用 .then()

有了这个设置,我们就拥有了解决您处理异步函数调用的任务所需的一切,同时缓存它们的结果以尽可能快地保持进程。此外,与 "callback hell".

相比,它阅读起来更愉快、更直接。

看看 http://jsfiddle.net/Tomalak/91bdb5ns/,在那里你可以看到它的工作原理。

请注意,我的代码中没有错误处理。您应该阅读 the bluebird docs 并自己添加。