AWS lambda 中的 pg-promise

pg-promise in AWS lambda

我在将 pg-promise 与 AWS lambda 结合使用时遇到了很多问题。我想了解如何解决这些问题。

库建议您创建数据库对象的一个​​实例,然后将其从模块中导出。应该只创建对象的一个​​实例。 类似于:

const db = pgp({
  host: process.env.DATABASE_HOST,
  port: process.env.DATABASE_PORT,
  database: process.env.DATABASE_NAME,
  user: process.env.DATABASE_USERNAME,
  password: process.env.DATABASE_PASSWORD,
  poolSize: 0,
  poolIdleTimeout: 10,
});
module.exports = db;

我知道这只是一个对象,这里没有创建任何连接。当您 运行 此数据库对象上的任何内容(例如 db.query().

时,将延迟创建连接

由于我们将池大小设置为 0,因此只会创建一个连接。这就是我们需要的,因为在每个 lambda 函数开始时我们需要创建一个连接,然后在 lambda 完成时关闭连接。

我们面临的问题是:

  1. 我们如何释放连接? AWS lambda 重用容器。这意味着它将调用已经初始化的相同节点代码,并且如果在先前的 运行 之后不久调用 lambda,则重新 运行 相同的函数。 这意味着 db 对象对于 lambda 的下一次调用将是相同的。在我们的第一个 lambda 完成后,如果我们调用 pgp.end() 文档说连接池关闭。它还说在那之后我们不能在同一进程中使用 pgp 库。但是该库将被使用,因为 db 对象仍然存在,并将在随后的 运行.

  2. 中使用
  3. 我们如何重试获取新连接?
    AWS lambda 带来的另一个问题是,当您 运行 VPC 中的 lambda 并且您的 Postgres 实例也在 VPC 中 运行ning 时,postgres 数据库的 DNS 需要时间来解析。因此,如果您尝试连接,您可能会收到 ENOTFOUND 错误。 AWS 的建议是重试获取连接。使用 pg-promise 我们如何重试获取连接?

我想实现的方式是:

module.exports.handler = (event, context, callback) => {
 let connection;
 try {
  connection = /*gets connection and retries if it failed the first time*/
  // run db queries and transactions.. etc.
  callback(null, result);
 } finally {
  connection.close();
 }

}

How do we release a connection?

你不知道。连接自动与连接池通信。一旦查询结束执行,连接将返回池中,以供下一个请求它的查询使用。

一次执行多个查询时,您应该使用任务。参见 Chaining Queries

How do we retry getting a new connection?

连接池根据连接需求和最大池大小,在需要时自动创建新的物理连接。当连接断开时,它会自动重新创建。


如果您在 AWS lambda 中只有一个连接,那么也许最适合您的模式是创建和维护一个全局连接?

如果是这样,Robust Listeners 示例可能对您有用。它展示了如何在连接池之外维护单个全局连接,以及如何使其始终保持活动状态。

但这更像是最后的手段。我相信简单地使用自动连接池就足够了。

这就是我们最终要做的事情。

关键是在每个 lambda 开始之前创建一个新连接,然后在从 lambda 返回之前关闭它

// your lambda entry point
module.exports.handler = (event, context, callback) =>  
getConnection(async (connection) => {
    let result;
    try {
        // work with your connection
    } catch (error) {
    }
    callback(null, result);
})


// db connection 

const getConnection = async (callback) => {
const dbConnection = new DBConnection();
try {
    const connection = await dbConnection.create();
    await callback(connection);
} finally {
    dbConnection.close();
}
};

const MAX_RETRY = 3;

const options = {
// global event notification;
error: (error, e) => {
    if (e.cn) {
    // A connection-related error;
    //
    // Connections are reported back with the password hashed,
    // for safe errors logging, without exposing passwords.
    logger.error('CN:', e.cn);
    logger.error('EVENT:', error.message || error);
    }
},
};

const pgp = require('pg-promise')(options);

const connectionParams = {
host: process.env.DATABASE_HOST,
port: process.env.DATABASE_PORT,
database: process.env.DATABASE_NAME,
user: process.env.DATABASE_USERNAME,
password: process.env.DATABASE_PASSWORD,
poolSize: 0,
poolIdleTimeout: 10,
};

const db = pgp(connectionParams);

class DBConnection {

async create() {
    let retry = 0;
    while (retry < MAX_RETRY) {
    try {
        logger.debug(`Acquiring a new DB connection Attempt: ${retry}/${MAX_RETRY}`);
        this.connection = await db.connect({ direct: true });
        break;
    } catch (error) {
        logger.error(`Error occurred while getting DB connection ${error}. Retrying ${retry}/${MAX_RETRY}`);
        retry += 1;
    }
    }

    if (!this.connection) {
    throw Error(`Unable to obtain DB connection after ${MAX_RETRY} retries`);
    }

    return this.connection;
}

close() {
    if (this.connection) {
    logger.debug('Closing DB Connection');
    this.connection.done();
    }
}
}