节点调用带有临时表的 postgres 函数导致 "memory leak"

Node calling postgres function with temp tables causing "memory leak"

我有一个 node.js 程序调用 Postgres(Amazon RDS 微实例)函数,get_jobs 在事务中,使用 brianc 的 node-postgres 包每秒 18 次。

节点代码只是brianc's basic client pooling example的加强版,大致像...

var pg = require('pg');
var conString = "postgres://username:password@server/database";

function getJobs(cb) {
  pg.connect(conString, function(err, client, done) {
    if (err) return console.error('error fetching client from pool', err);
    client.query("BEGIN;");
    client.query('select * from get_jobs()', [], function(err, result) {
      client.query("COMMIT;");
      done(); //call `done()` to release the client back to the pool
      if (err) console.error('error running query', err);
      cb(err, result);
    });
  });
}

function poll() {
  getJobs(function(jobs) {
    // process the jobs
  });
  setTimeout(poll, 55);
}

poll(); // start polling

所以 Postgres 得到:

2016-04-20 12:04:33 UTC:172.31.9.180(38446):XXX@XXX:[5778]:LOG:  statement: BEGIN;
2016-04-20 12:04:33 UTC:172.31.9.180(38446):XXX@XXX:[5778]:LOG:  execute <unnamed>: select * from get_jobs();
2016-04-20 12:04:33 UTC:172.31.9.180(38446):XXX@XXX:[5778]:LOG:  statement: COMMIT;

...每 55 毫秒重复一次。

get_jobs是用temp写的tables,像这样

CREATE OR REPLACE FUNCTION get_jobs (
) RETURNS TABLE (
  ...
) AS 
$BODY$
DECLARE 
  _nowstamp bigint; 
BEGIN

  -- take the current unix server time in ms
  _nowstamp := (select extract(epoch from now()) * 1000)::bigint;  

  --  1. get the jobs that are due
  CREATE TEMP TABLE jobs ON COMMIT DROP AS
  select ...
  from really_big_table_1 
  where job_time < _nowstamp;

  --  2. get other stuff attached to those jobs
  CREATE TEMP TABLE jobs_extra ON COMMIT DROP AS
  select ...
  from really_big_table_2 r
    inner join jobs j on r.id = j.some_id

  ALTER TABLE jobs_extra ADD PRIMARY KEY (id);

  -- 3. return the final result with a join to a third big table
  RETURN query (

    select je.id, ...
    from jobs_extra je
      left join really_big_table_3 r on je.id = r.id
    group by je.id

  );

END
$BODY$ LANGUAGE plpgsql VOLATILE;

我使用了 the temp table pattern,因为我知道 jobs 总是从 really_big_table_1 中提取的一小部分行,希望这比单个查询具有更好的扩展性多个连接和多个 where 条件。 (我在 SQL 服务器上使用它效果很好,我现在不信任任何查询优化器,但请告诉我这是否是 Postgres 的错误方法!)

查询 运行s 在 8 毫秒内 tables(从节点测量),在下一个作业开始之前有足够的时间完成一项作业 "poll"。

问题:以这种速度轮询大约 3 小时后,Postgres 服务器 运行 内存不足并崩溃。

我已经试过了...

那么ON COMMIT DROP是不是释放了所有的关联资源?还有什么可以保持记忆?如何释放它?

I used this to great effect with SQL Server and I don't trust any query optimiser now

那就别用了。您仍然可以直接执行查询,如下所示。

but please tell me if this is the wrong approach for Postgres!

这不是一个完全错误的方法,它只是一个非常笨拙的方法,因为您正在尝试创建其他人已经实现的东西以便更容易使用。结果,您犯了很多错误,这些错误可能会导致很多问题,包括内存泄漏。

与使用 pg-promise 的完全相同示例的简单性比较:

var pgp = require('pg-promise')();
var conString = "postgres://username:password@server/database";
var db = pgp(conString);

function getJobs() {
    return db.tx(function (t) {
        return t.func('get_jobs');
    });
}

function poll() {
    getJobs()
        .then(function (jobs) {
            // process the jobs
        })
        .catch(function (error) {
            // error
        });

    setTimeout(poll, 55);
}

poll(); // start polling

使用 ES6 语法时变得更简单:

var pgp = require('pg-promise')();
var conString = "postgres://username:password@server/database";
var db = pgp(conString);

function poll() {
    db.tx(t=>t.func('get_jobs'))
        .then(jobs=> {
            // process the jobs
        })
        .catch(error=> {
            // error
        });

    setTimeout(poll, 55);
}

poll(); // start polling

在你的示例中我唯一不太理解的是使用事务来执行单个 SELECT。这不是事务通常的用途,因为您没有更改任何数据。我假设您正在尝试缩小您拥有的一段真实代码,该代码也会更改一些数据。

如果您不需要交易,您的代码可以进一步简化为:

var pgp = require('pg-promise')();
var conString = "postgres://username:password@server/database";
var db = pgp(conString);

function poll() {
    db.func('get_jobs')
        .then(jobs=> {
            // process the jobs
        })
        .catch(error=> {
            // error
        });

    setTimeout(poll, 55);
}

poll(); // start polling

更新

但是,不控制上一个请求的结束是一种危险的方法,这也可能会产生 memory/connection 问题。

安全的方法应该是:

function poll() {
    db.tx(t=>t.func('get_jobs'))
        .then(jobs=> {
            // process the jobs

            setTimeout(poll, 55);
        })
        .catch(error=> {
            // error

            setTimeout(poll, 55);
        });
}

使用 CTE 创建部分结果集而不是临时表。

CREATE OR REPLACE FUNCTION get_jobs (
) RETURNS TABLE (
  ...
) AS 
$BODY$
DECLARE 
  _nowstamp bigint; 
BEGIN

  -- take the current unix server time in ms
  _nowstamp := (select extract(epoch from now()) * 1000)::bigint;  

  RETURN query (

    --  1. get the jobs that are due
    WITH jobs AS (

      select ...
      from really_big_table_1 
      where job_time < _nowstamp;

    --  2. get other stuff attached to those jobs
    ), jobs_extra AS (

      select ...
      from really_big_table_2 r
        inner join jobs j on r.id = j.some_id

    ) 

    -- 3. return the final result with a join to a third big table
    select je.id, ...
    from jobs_extra je
      left join really_big_table_3 r on je.id = r.id
    group by je.id

  );

END
$BODY$ LANGUAGE plpgsql VOLATILE;

规划器将按照我希望使用临时表实现的方式按顺序评估每个块。

我知道这并不能直接解决内存泄漏问题(我很确定 Postgres 对它们的实现有问题,至少它们在 RDS 配置上的表现方式是这样)。

但是,查询有效,它是按我预期的方式计划的查询,并且在 运行 作业 3 天后内存使用现在稳定,而且我的服务器没有崩溃。

我根本没有更改节点代码。