如何使用 Spring 重试 PostgreSQL 可序列化事务?

How to retry a PostgreSQL serializable transaction with Spring?

我正在针对 PostgreSQL v12 数据库进行开发。我正在使用 SERIALIZABLE 交易。一般的想法是,当 PostgreSQL 检测到序列化异常时,应该重试完整的事务。

我正在使用 Spring 的 AbstractFallbackSQLExceptionTranslator to translate database exceptions to Spring's exception classes. This exception translator should translate the PostgreSQL error 40001/serialization_failure to a ConcurrencyFailureException. Spring JDBC maintains a mapping file 将 PostgreSQL 特定代码 40001 映射到数据库异常的通用 cannotSerializeTransactionCodes class,这为 API 用户转换为 ConcurrencyFailureException

我的想法是依靠 Spring 重试项目来重试由于以下序列化错误而暂停的 SERIALIZABLE 事务:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Retryable(include = ConcurrencyFailureException.class, maxAttempts = ..., backoff = ...)
@Transactional(isolation = Isolation.SERIALIZABLE)
public @interface SerializableTransactionRetry {
}

在服务实现中,我会简单地将 @Transactional 替换为 @SerializableTransactionRetry 并完成它。

现在,回到 PostgreSQL。本质上,可以在两个阶段检测到序列化异常:

  1. 语句执行期间
  2. 在事务的提交阶段

似乎 Spring 的 AbstractFallbackSQLExceptionTranslator 正确地翻译了一个在语句执行期间检测到的序列化异常,但在提交阶段未能翻译。考虑以下堆栈跟踪:

org.springframework.transaction.TransactionSystemException: Could not commit JDBC transaction; nested exception is org.postgresql.util.PSQLException: ERROR: could not serialize access due to read/write dependencies among transactions
  Detail: Reason code: Canceled on identification as a pivot, during commit attempt.
  Hint: The transaction might succeed if retried.
    at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:332)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:746)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:533)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:304)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.doWithRetry(RetryOperationsInterceptor.java:91)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:118)
    at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:153)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)

如您所见,PostgreSQL 检测到序列化异常 (ERROR: could not serialize access due to ...),但这被 Spring 转换为 TransactionSystemException 而不是 ConcurrencyFailureException

我可以更改上面的 SerializableTransactionRetry 注释以包含 TransactionSystemException,但我认为这是错误的,因为现在我们将重试任何类型的交易错误,这是这不是我们想要的。

这是Spring的AbstractFallbackSQLExceptionTranslator的缺点吗?我正在使用 Spring 5.2.1.

https://github.com/spring-projects/spring-framework/issues/24064#issuecomment-557800496, the SQLExceptionTranslator 中所述,实际上不用于提交阶段发生的 SQL 异常。

在同一张票中,有人提议在 Spring 5.3 中引入它(我相信它会在 2020 年第二季度的某个地方发布)。

如果您正在使用 spring-boot,您可以创建自定义 DataSourceTransactionManager 并在其 doCommit 方法中抛出 ConcurrencyFailureException 子类之一,如果捕获的异常是 sql 错误代码是 40001.


import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.dao.CannotSerializeTransactionException;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.DefaultTransactionStatus;

@Component
public class MyDataSourceTransactionManager {

  @Bean
  DataSourceTransactionManager getDataSourceTransactionManager(DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource) {
      @Override
      protected void doCommit(DefaultTransactionStatus status) {
        try {
          super.doCommit(status);
        } catch (TransactionSystemException e) {
          Throwable throwable = e.getCause();
          if (throwable instanceof SQLException && "40001"
              .equals(((SQLException) throwable).getSQLState())) {
            throw new CannotSerializeTransactionException(throwable.getMessage(), throwable);
          }
          throw e;
        }
      }
    };
  }

}

您还可以使用 SQLExceptionTranslatorSQLException 转换为 DataAccessException 而不是检查 sql 错误代码。


import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.UncategorizedSQLException;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
import org.springframework.jdbc.support.SQLExceptionTranslator;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.DefaultTransactionStatus;

@Component
public class MyDataSourceTransactionManager {

  @Bean
  DataSourceTransactionManager getDataSourceTransactionManager(DataSource dataSource) {
    final SQLExceptionTranslator exTranslator = new SQLErrorCodeSQLExceptionTranslator(dataSource);
    return new DataSourceTransactionManager(dataSource) {
      @Override
      protected void doCommit(DefaultTransactionStatus status) {
        try {
          super.doCommit(status);
        } catch (TransactionSystemException e) {
          if (e.getCause() instanceof SQLException) {
            DataAccessException exception = exTranslator
                .translate("commit", null, (SQLException) e.getCause());
            if (!(exception instanceof UncategorizedSQLException)) {
              throw exception;
            }
          }
          throw e;
        }
      }
    };
  }

}