Node.js、PostgreSQL 中的事务冲突、乐观并发控制和事务重试

Node.js, transaction coflicts in PostgreSQL, optimistic concurrency control and transaction retries

我想使用 PostgreSQL transaction isolation to ensure data correctness with optimistic concurrency control 自动重试冲突事务的模式,而不是我的应用程序预先锁定数据库行和表。

实现此目的的一种常用方法是 Web 应用程序 a specific number of times within a code block or replays the HTTP request by a middleware layer, also known as HTTP request replay. Here is an example of such a middleware for Pyramid and Python web applications web

我没有找到任何关于 Node.js 及其 PostgreSQL 驱动程序如何处理有两个并发事务正在进行且一个因读写冲突而无法通过的情况的有用信息。 PostgreSQL 将回滚其中一项事务,但这是如何向应用程序发出信号的?在Python 中,PSQL 驱动程序会在这种情况下引发psycopg2.extensions.TransactionRollbackErrorFor other SQL database drivers here are some exceptions they will raise.

当您将 SQL 事务隔离级别设置为 SERIALIZABLE 时,这种行为更为常见,因为您在负载下往往会遇到更多冲突,所以我想优雅地处理它,而不是将 HTTP 500 交给用户。

我的问题是:

以下是在使用使用 pg library 的库(例如 TypeORM)时如何处理并发:

/**
 * Check error code to determine if we should retry a transaction.
 * 
 * See https://www.postgresql.org/docs/10/errcodes-appendix.html and
 * 
 */
function shouldRetryTransaction(err: unknown) {
  const code = typeof err === 'object' ? String((err as any).code) : null
  return code === '40001' || code === '40P01';
}

/**
 * Using a repeatable read transaction throws an error with the code 40001 
 * "serialization failure due to concurrent update" if the user was 
 * updated by another concurrent transaction.
 */
async function updateUser(data: unknown) {
  try {
    return await this.userRepo.manager.transaction(
      'REPEATABLE READ',
      async manager => {
        const user = manager.findOne(User, id);
    
        // Modify user
        // ...
    
        // Save the user
        await manager.save(user);
      }
    );
  } catch (err) {
    if (shouldRetryTransaction(err)) {
      // retry logic     
    } else {
      throw err;
    }
  }
}

对于重试事务,我建议使用诸如 async-retry 之类的库,它抽象了重试逻辑。

你会注意到这种模式非常适合简单的东西,但如果你想传递 manager(例如,以便事务可以在其他服务中重用),那么这将变得非常麻烦。我建议使用 typeorm-transactional-cls-hooked 库,它利用连续本地存储来传播事务。

您可以通过以下方式重播 Express 应用程序的交易:

/**
 * Request replay middleware
 */

import retry from 'async-retry';

function replayOnTransactionError(fn: (req, res, next) => unknown) {
  return (req, res, next) => {
    retry(bail => {
      try {
        // Call the actual handler
        await fn(req, res, next);
      } catch (err) {
        if (!shouldRetryTransaction(err)) {
          // Bail out if we're not supposed to retry anymore
          return bail(err);
        }

        // Rethrow error to continue retrying
        throw err;
      }
    }, {
      factor: 2,
      retries: 3,
      minTimeout: 30,
    });
  }
}

app.put('/users/:id', replayOnTransactionError(async (req, res, next) => {
  // ...
}))