Node.js、PostgreSQL 中的事务冲突、乐观并发控制和事务重试
Node.js, transaction coflicts in PostgreSQL, optimistic concurrency control and transaction retries
我想使用 PostgreSQL transaction isolation to ensure data correctness with optimistic concurrency control 自动重试冲突事务的模式,而不是我的应用程序预先锁定数据库行和表。
实现此目的的一种常用方法是 Web 应用程序 a specific number of times within a code block or replays the HTTP request by a middleware layer, also known as HTTP request replay. Here is an example of such a middleware for Pyramid and Python web applications web。
我没有找到任何关于 Node.js 及其 PostgreSQL 驱动程序如何处理有两个并发事务正在进行且一个因读写冲突而无法通过的情况的有用信息。 PostgreSQL 将回滚其中一项事务,但这是如何向应用程序发出信号的?在Python 中,PSQL 驱动程序会在这种情况下引发psycopg2.extensions.TransactionRollbackError
。 For other SQL database drivers here are some exceptions they will raise.
当您将 SQL 事务隔离级别设置为 SERIALIZABLE 时,这种行为更为常见,因为您在负载下往往会遇到更多冲突,所以我想优雅地处理它,而不是将 HTTP 500 交给用户。
我的问题是:
如何使用 PostgreSQL 和一些常见的 ORM 框架(如 TypeORM)检测脏读回滚 - 如果需要特殊处理并且重试库不能独立?
是否有一个中间件(NestJS/Express.js/others)来处理这个问题,并在事务回滚时自动尝试重放 HTTP 请求 N 次数据库驱动程序?
以下是在使用使用 pg
library 的库(例如 TypeORM)时如何处理并发:
/**
* Check error code to determine if we should retry a transaction.
*
* See https://www.postgresql.org/docs/10/errcodes-appendix.html and
*
*/
function shouldRetryTransaction(err: unknown) {
const code = typeof err === 'object' ? String((err as any).code) : null
return code === '40001' || code === '40P01';
}
/**
* Using a repeatable read transaction throws an error with the code 40001
* "serialization failure due to concurrent update" if the user was
* updated by another concurrent transaction.
*/
async function updateUser(data: unknown) {
try {
return await this.userRepo.manager.transaction(
'REPEATABLE READ',
async manager => {
const user = manager.findOne(User, id);
// Modify user
// ...
// Save the user
await manager.save(user);
}
);
} catch (err) {
if (shouldRetryTransaction(err)) {
// retry logic
} else {
throw err;
}
}
}
对于重试事务,我建议使用诸如 async-retry
之类的库,它抽象了重试逻辑。
你会注意到这种模式非常适合简单的东西,但如果你想传递 manager
(例如,以便事务可以在其他服务中重用),那么这将变得非常麻烦。我建议使用 typeorm-transactional-cls-hooked
库,它利用连续本地存储来传播事务。
您可以通过以下方式重播 Express 应用程序的交易:
/**
* Request replay middleware
*/
import retry from 'async-retry';
function replayOnTransactionError(fn: (req, res, next) => unknown) {
return (req, res, next) => {
retry(bail => {
try {
// Call the actual handler
await fn(req, res, next);
} catch (err) {
if (!shouldRetryTransaction(err)) {
// Bail out if we're not supposed to retry anymore
return bail(err);
}
// Rethrow error to continue retrying
throw err;
}
}, {
factor: 2,
retries: 3,
minTimeout: 30,
});
}
}
app.put('/users/:id', replayOnTransactionError(async (req, res, next) => {
// ...
}))
我想使用 PostgreSQL transaction isolation to ensure data correctness with optimistic concurrency control 自动重试冲突事务的模式,而不是我的应用程序预先锁定数据库行和表。
实现此目的的一种常用方法是 Web 应用程序
我没有找到任何关于 Node.js 及其 PostgreSQL 驱动程序如何处理有两个并发事务正在进行且一个因读写冲突而无法通过的情况的有用信息。 PostgreSQL 将回滚其中一项事务,但这是如何向应用程序发出信号的?在Python 中,PSQL 驱动程序会在这种情况下引发psycopg2.extensions.TransactionRollbackError
。 For other SQL database drivers here are some exceptions they will raise.
当您将 SQL 事务隔离级别设置为 SERIALIZABLE 时,这种行为更为常见,因为您在负载下往往会遇到更多冲突,所以我想优雅地处理它,而不是将 HTTP 500 交给用户。
我的问题是:
如何使用 PostgreSQL 和一些常见的 ORM 框架(如 TypeORM)检测脏读回滚 - 如果需要特殊处理并且重试库不能独立?
是否有一个中间件(NestJS/Express.js/others)来处理这个问题,并在事务回滚时自动尝试重放 HTTP 请求 N 次数据库驱动程序?
以下是在使用使用 pg
library 的库(例如 TypeORM)时如何处理并发:
/**
* Check error code to determine if we should retry a transaction.
*
* See https://www.postgresql.org/docs/10/errcodes-appendix.html and
*
*/
function shouldRetryTransaction(err: unknown) {
const code = typeof err === 'object' ? String((err as any).code) : null
return code === '40001' || code === '40P01';
}
/**
* Using a repeatable read transaction throws an error with the code 40001
* "serialization failure due to concurrent update" if the user was
* updated by another concurrent transaction.
*/
async function updateUser(data: unknown) {
try {
return await this.userRepo.manager.transaction(
'REPEATABLE READ',
async manager => {
const user = manager.findOne(User, id);
// Modify user
// ...
// Save the user
await manager.save(user);
}
);
} catch (err) {
if (shouldRetryTransaction(err)) {
// retry logic
} else {
throw err;
}
}
}
对于重试事务,我建议使用诸如 async-retry
之类的库,它抽象了重试逻辑。
你会注意到这种模式非常适合简单的东西,但如果你想传递 manager
(例如,以便事务可以在其他服务中重用),那么这将变得非常麻烦。我建议使用 typeorm-transactional-cls-hooked
库,它利用连续本地存储来传播事务。
您可以通过以下方式重播 Express 应用程序的交易:
/**
* Request replay middleware
*/
import retry from 'async-retry';
function replayOnTransactionError(fn: (req, res, next) => unknown) {
return (req, res, next) => {
retry(bail => {
try {
// Call the actual handler
await fn(req, res, next);
} catch (err) {
if (!shouldRetryTransaction(err)) {
// Bail out if we're not supposed to retry anymore
return bail(err);
}
// Rethrow error to continue retrying
throw err;
}
}, {
factor: 2,
retries: 3,
minTimeout: 30,
});
}
}
app.put('/users/:id', replayOnTransactionError(async (req, res, next) => {
// ...
}))