具有大量查询的 node-postgres

node-postgres with massive amount of queries

我刚开始玩弄 node.js 和 postgres,使用 node-postgres。我尝试做的一件事是编写一个简短的 js 来填充我的数据库,使用一个包含大约 200,000 个条目的文件。

我注意到一段时间后(不到 10 秒),我开始得到 "Error: Connection terminated"。我不确定这是否与我使用 node-postgres 的方式有关,或者是否因为我在向 postgres 发送垃圾邮件。

无论如何,这是一个显示此行为的简单代码:

var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";

pg.connect(connectionString, function(err,client,done){
  if(err) {
    return console.error('could not connect to postgres', err);
  }

  client.query("DROP TABLE IF EXISTS testDB");
  client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)");
  done();

  for (i = 0; i < 1000000; i++){
    client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")",   function(err,result){
      if (err) {
         return console.error('Error inserting query', err);
      }
      done();
    });
  }
});

它在大约 18,000-20,000 次查询后失败。这是使用 client.query 的错误方法吗?我尝试更改默认客户端编号,但似乎无济于事。

client.connect() 似乎也没有帮助,但那是因为我有太多客户,所以我绝对认为客户池是可行的方法。

感谢您的帮助!

我猜您即将达到最大池大小。由于 client.query 是异步的,很可能所有可用的连接在返回之前都会被使用。

默认池大小为 10。请在此处查看:https://github.com/brianc/node-postgres/blob/master/lib/defaults.js#L27

您可以通过设置 pg.defaults.poolSize 来增加默认池大小:

pg.defaults.poolSize = 20;

更新:释放连接后执行另一个查询。

var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";
var MAX_POOL_SIZE = 25;

pg.defaults.poolSize = MAX_POOL_SIZE;
pg.connect(connectionString, function(err,client,done){
  if(err) {
    return console.error('could not connect to postgres', err);
  }

  var release = function() {
    done();
    i++;
    if(i < 1000000)
      insertQ();
  };

  var insertQ = function() {
    client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")",        function(err,result){
      if (err) {
         return console.error('Error inserting query', err);
      }
      release();
    });
  };

  client.query("DROP TABLE IF EXISTS testDB");
  client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int,    second int)");
  done();

  for (i = 0; i < MAX_POOL_SIZE; i++){
    insertQ();
  }
});

基本思路是,由于您使用相对较小的连接池大小对大量查询进行排队,因此您将达到最大池大小。此处我们仅在释放现有连接后才进行新查询。

更新

此答案已被这篇文章取代:Data Imports,它代表了最新的方法。


为了复制您的场景,我使用了 pg-promise 库,我可以确认正面尝试永远不会成功,无论您使用哪个库,重要的是方法。

下面是一种修改后的方法,我们将插入分成块,然后在事务中执行每个块,这就是负载平衡(也称为节流):

function insertRecords(N) {
    return db.tx(function (ctx) {
        var queries = [];
        for (var i = 1; i <= N; i++) {
            queries.push(ctx.none('insert into test(name) values()', 'name-' + i));
        }
        return promise.all(queries);
    });
}
function insertAll(idx) {
    if (!idx) {
        idx = 0;
    }
    return insertRecords(100000)
        .then(function () {
            if (idx >= 9) {
                return promise.resolve('SUCCESS');
            } else {
                return insertAll(++idx);
            }
        }, function (reason) {
            return promise.reject(reason);
        });
}
insertAll()
    .then(function (data) {
        console.log(data);
    }, function (reason) {
        console.log(reason);
    })
    .done(function () {
        pgp.end();
    });

这在大约 4 分钟内产生了 1000,000 条记录,在前 3 个事务之后速度急剧下降。我使用的是 Node JS 0.10.38(64 位),它消耗了大约 340MB 的内存。这样我们插入了10万条记录,连续10次。

如果我们这样做,只是这次在 100 个事务中插入 10,000 条记录,同样的 1,000,000 条记录在 1 分 25 秒内添加,没有减速,Node JS 消耗大约 100MB 的内存,这告诉我们分区数据这样是个好主意。

不管你使用哪个库,方法应该是一样的:

  1. Partition/throttle 您插入到多个事务中;
  2. 将单个事务中的插入列表保持在大约 10,000 条记录;
  3. 在同步链中执行所有交易。
  4. 在每个事务的 COMMIT 之后将连接释放回池中。

如果您违反任何这些规则,您肯定会遇到麻烦。例如,如果你违反了规则 3,你的 Node JS 进程可能会 运行 内存不足并抛出错误。我的示例中的规则 4 由图书馆提供。

如果您遵循此模式,则无需为连接池设置而烦恼。

更新 1

pg-promise以后版本完美支持此类场景,如下图:

function factory(index) {
    if (index < 1000000) {
        return this.query('insert into test(name) values()', 'name-' + index);
    }
}

db.tx(function () {
    return this.batch([
        this.none('drop table if exists test'),
        this.none('create table test(id serial, name text)'),
        this.sequence(factory), // key method
        this.one('select count(*) from test')
    ]);
})
    .then(function (data) {
        console.log("COUNT:", data[3].count);
    })
    .catch(function (error) {
        console.log("ERROR:", error);
    });

如果你不想包含任何额外的东西,比如 table 创建,那么它看起来更简单:

function factory(index) {
    if (index < 1000000) {
        return this.query('insert into test(name) values()', 'name-' + index);
    }
}

db.tx(function () {
    return this.sequence(factory);
})
    .then(function (data) {
        // success;
    })
    .catch(function (error) {
        // error;
    });

有关详细信息,请参阅 Synchronous Transactions

例如,使用 Bluebird 作为 promise 库,在我的生产机器上插入 1,000,000 条记录(没有启用长堆栈跟踪)需要 1 分 43 秒。

您将根据 indexfactory 方法 return 请求,直到剩下 none,就这么简单。

最棒的是,这不仅速度快,而且对您的 NodeJS 进程造成的负载很小。内存测试过程在整个测试过程中保持在 60MB 以下,仅消耗 CPU 时间的 7-8%。

更新 2

从1.7.2版本开始,pg-promise轻松支持超海量交易。请参阅第 Synchronous Transactions.

例如,在 Windows 8.1 64 位 Windows 64 位

的家用电脑上,我可以在 15 分钟内在单个事务中插入 10,000,000 条记录。

为了测试,我将 PC 设置为生产模式,并使用 Bluebird 作为 promise 库。在测试期间,整个 NodeJS 0.12.5 进程(64 位)的内存消耗没有超过 75MB,而我的 i7-4770 CPU 显示一致的 15% 负载。

以同样的方式插入 100m 的记录只需要更多的耐心,而不需要更多的计算机资源。

与此同时,之前对 1m 插入的测试从 1m43s 下降到 1m31s。

更新 3

以下注意事项可能会产生巨大的差异:Performance Boost

更新 4

相关问题,有更好的实现示例: .

更新 5

可以在此处找到更好、更新的示例: