使用流时 knex.js 上的内存问题

Memory issues on knex.js when using streams

我正在尝试使用 knex.js 将整个 sqlite3 数据库 table 导出到 CSV。由于 table 最多可以有 300000 行,我使用流来避免内存问题。但是,如果我查看我的应用程序的内存使用情况,它高达 800MB,或者我有一个 "out of memory" 错误。

如何在 sqlite3 数据库上使用 knex.js 处理大型查询结果?

下面是一个代码示例:

knex.select().from(table).stream(function (stream) {
    var stringifier = stringify(opts);
    var fileStream = fs.createWriteStream(file);

    var i = 0;
    stringifier.on('readable', function() {
      var row;
      while (row = stringifier.read()) {
        fileStream.write(row);
        console.log("row " + i++); //debug
      }
    });

    fileStream.once('open', function(fd) {
      stream.pipe(stringifier);
    });
});

编辑

sqlite3 数据库的 knex.js 流似乎是 "fake" 流。 下面是 knex 中 sqlite3 的流函数的源代码:

Runner_SQLite3.prototype._stream = Promise.method(function(sql, stream, options) {
    /*jshint unused: false*/
    var runner = this;
    return new Promise(function(resolver, rejecter) {
        stream.on('error', rejecter);
        stream.on('end', resolver);
        return runner.query(sql).map(function(row) {
            stream.write(row);
        }).catch(function(err) {
            stream.emit('error', err);
        }).then(function() {
            stream.end();
        });
    });
});

我们看到它在创建流之前等待请求执行 来自结果数组。

版本:

谢谢你的帮助。

我认为没有解决办法。我使用 limit 和 offset 通过 knex.js 逐步获取所有数据,并将每一行写入写入流。 对于那些想要的人的实施示例:

 exportTable: function(table, writeStream) {
    var totalRows;
    var rowLimit = _config.ROW_LIMIT;

    return DatabaseManager.countAll(table).then(function(count) {

        totalRows = count[0]['count(*)'];
        var iterations = new Array(Math.ceil(totalRows / rowLimit));

        return Promise.reduce(iterations, function(total, item, index) {

            return _knex.select().from(table).limit(rowLimit).offset(index * rowLimit).map(function(row) {
                writeStream.write(row);
            }).catch(function(err) {
                return Promise.reject(err);
            });

        }, 0).then(function() {
            return Promise.resolve();
        }).catch(function(err) {
            return Promise.reject(err);
        });

    }).catch(function(err) {
        console.log(err);
        return Promise.reject(err);
    });
}