需要一种内置的方式来为 Dapper 添加死锁弹性,而不改变现有的回购协议

Need a built in way to add Deadlock Resilience to Dapper for existing repos without altering them

需要使所有现有的存储库(大约 30+)容错死锁并使用日志和等待方法从死锁中恢复。

尝试成功:经过一些研究并根据项目对其进行了定制,我在下面回答了使用 Polly 的自定义 SqlResiliencyPolicy。

但是,我寻求的是:目前的方式(PFB 回答),要求我要么

  1. await _policy.ExecuteAsync
  2. 包装所有现有的数据库调用
  3. 提供接受 IAsyncPolicy 参数的自定义重载。然后调用预期的方法。 IDbConnection 的扩展类型:

public static Task<T> GetAsync<T>(this IDbConnection connection, object primaryKey, IAsyncPolicy policy) => return await _policy.ExecuteAsync(async () => GetAsync<T> (...));

在这两种方式中,我都需要更改所有 30 多个回购协议。但是,在 dapper/some-other-approaches 中是否有内置的方法,我们可以在其中

"configure a Policy in startup and auto-magically all DB calls via dapper become resilient (fall backs to their fault tolerant mechanism) Similar to the ways of http clients resilience where policy is added while you register a client"

由此:将代码更改降至最低,无需接触 repos,只需启动即可。

我有以下方法,需要对其进行改进。

经过一些研究后我的做法:

public class SqlResiliencyPolicy 
{ 
    private readonly ISet<int> transientDbErrors = new HashSet<int>(new[] { 1205 });
    private readonly ILogger _logger;
    private readonly IConfiguration _configuration;

    public SqlResiliencyPolicy(ILogger logger, IConfiguration configuration)
    {
        _logger = logger;
        _configuration = configuration;
    }

    public IAsyncPolicy GetSqlResiliencyPolicy(int transientErrorRetries = 3)
    {
        return Policy
            .Handle<SqlException>(ex => transientDbErrors.Contains(ex.Number))
            .WaitAndRetryAsync(
                retryCount: transientErrorRetries,
                sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                (exception, sleepTime, reattempt, context) =>
                {
                    _logger.Log(LogLevel.Error, exception, $"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattempt}");
                });
    }
}

启动中:

 services.AddScoped(_ => new SqlResiliencyPolicy(_logger, _configuration).GetSqlResiliencyPolicy());

Ctor DI:在现有的 Repos DI 到 Ctor 中,带有私有 IAsyncPolicy 支持字段:

private readonly IAsyncPolicy _policy;

最后一步:用

包装所有小巧的调用
await _policy.ExecuteAsync(async () => {<existing DB call>});

已实施第二种方法 ^^:这将要对现有存储库进行 DI 的策略解耦。 IDbConnection 的扩展方法负责围绕现有方法包装策略。

public class SqlResiliencePolicyFactory
{
    private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
    private readonly ILogger _logger;
    private readonly IConfiguration _configuration;

    public SqlResiliencePolicyFactory(ILogger logger, IConfiguration configuration)
    {
        _logger = logger;
        _configuration = configuration;
    }

    public IPolicyRegistry<string> GetSqlResiliencePolicies(int transientErrorRetries = 3)
    {
        return new PolicyRegistry
        {
            { 
                "DbDeadLockResilience", 
                Policy
                    .Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
                    .WaitAndRetry(
                        retryCount: transientErrorRetries,
                        sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                        onRetry: LogRetryAction)
            },
            { 
                "DbDeadLockResilienceAsync", 
                Policy
                    .Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
                    .WaitAndRetryAsync(
                        retryCount: transientErrorRetries,
                        sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                        onRetry: LogRetryAction)
            }
        };
    }
    
    private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
        _logger.Log(
            LogLevel.Warning,
            exception,
            @$"Transient DB Failure while executing query,
                error number: {((SqlException)exception).Number};
                reattempt number: {reattemptCount}");
}

启动中:

DapperExtensions.SetPolicies(new SqlResiliencePolicyFactory(_logger, _configuration)
                            .GetSqlResiliencePolicies());

在单独的 class 中创建扩展方法,以围绕您的存储库的现有方法包装策略。 扩展方法:

public static class DapperExtensions
{
    private static Policy _policy = Policy.NoOp();
    private static IAsyncPolicy _asyncPolicy = Policy.NoOpAsync();

    public static void SetPolicies(IReadOnlyPolicyRegistry<string> readOnlyPolicyRegistry)
        {
            _policy = readOnlyPolicyRegistry.Get<Policy>("DbDeadLockResilience");
            _asyncPolicy = readOnlyPolicyRegistry.Get<IAsyncPolicy>("DbDeadLockResilienceAsync");
        }

    public static T GetFirstWithRetry<T>(this IDbConnection connection,
                                        string? sql = null, object? parameters = null, IDbTransaction? transaction = null) where T : class =>
        _policy.Execute(() => connection.GetFirst<T>(sql, parameters, transaction));

    public static T QueryFirstOrDefaultWithRetry<T>(this IDbConnection connection, string sql,
                                          object? parameters = null, IDbTransaction? transaction = null) =>
        _policy.Execute(() => connection.QueryFirstOrDefault<T>(sql, parameters, transaction));

    public static async Task<bool> UpdateAsyncWithRetry<T>(this IDbConnection connection, T entityToUpdate, IEnumerable<string> columnsToUpdate,
                                                      IDbTransaction? transaction = null) where T : class =>
        await _asyncPolicy.ExecuteAsync(async () => await connection.UpdateAsync(entityToUpdate, columnsToUpdate, transaction));

    //Similarly, add overloads to all the other methods in existing repo.
}

现在,

  1. 现有的回购独立于政策(没有 DI 回购)。
  2. 政策按照 SRP 保存在单独的地方。
  3. Dapper 扩展可以更改策略以便于测试。

因此,现有的 repos 必须更改名称并调用上述包装器而不是调用 dapper 方法本身,将应用策略。不要忘记对回购进行一次回归测试。

以下是迄今为止对现有存储库进行 minimal/no 更改的恰当方法。感谢@Sergey Akopov 撰写的博客以及指向此博客的我的同事。

简短回答:使用装饰器模式包装 SQL Client's Connection and Command instances 并将 Polly 的重试策略注入这些装饰器。通过这种方式,将能够使用重试策略包装所有 SQL 执行端点。这将与 Dapper 兼容,因为它是 IDbConnection.

的扩展

创建一个 DI'able Retry 策略,将策略封装在其中。此外,我们可以完全解耦策略以分离 class 并为 DI 注册它(此答案中未显示,但其他答案中遵循这一点,如果您有更多,请不要忘记使用 PolicyRegister不止一项政策)。

Git 回购:https://github.com/VinZCodz/SqlTransientFaultHandling

详情:

策略接口,没有异步方法,因为 Microsoft.Data.SqlClient 端点不是异步的。

public interface IRetryPolicy
{
 void Execute(Action operation);
 TResult Execute<TResult>(Func<TResult> operation);
}

具体实现,将策略嵌入其中,并通过 Sql Client 和 Dapper 包装所有 DB 调用的重试逻辑。

public class RetryPolicy : IRetryPolicy
{
    private readonly ILogger<RetryPolicy> _logger;
    private readonly Policy _retryPolicy;

    private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
    private const int _transientErrorRetries = 3;

    public RetryPolicy(ILogger<RetryPolicy> logger)
    {
        _logger = logger;
        _retryPolicy = Policy
                        .Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
                        .WaitAndRetry(
                        retryCount: _transientErrorRetries,
                        sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
                        onRetry: LogRetryAction);
    }

    public void Execute(Action operation) => _retryPolicy.Execute(operation.Invoke);

    public TResult Execute<TResult>(Func<TResult> operation) => _retryPolicy.Execute(() => operation.Invoke());

    private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
        _logger.LogWarning(
            exception,
            $"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattemptCount}");
}

现在,我们需要以某种方式将此策略注入 SqlClient 的 con 和 cmd,需要 sealed class 其中 'is-a' DbConnection(DAL 端点将保持不变)还有'has-a' DbConnection(模仿操作但重试):

 public sealed class ReliableSqlDbConnection : DbConnection
 {
    private readonly SqlConnection _underlyingConnection;
    private readonly IRetryPolicy _retryPolicy;

    private bool _disposedValue;
    private string _connectionString;

    public ReliableSqlDbConnection(string connectionString, IRetryPolicy retryPolicy)
    {
        _connectionString = connectionString;
        _retryPolicy = retryPolicy;
        _underlyingConnection = new SqlConnection(connectionString);
    }

    public override string ConnectionString
    {
        get => _connectionString;
        set => _underlyingConnection.ConnectionString = _connectionString = value;
    }
    public override void Open()
    {
        _retryPolicy.Execute(() =>
        {
            if (_underlyingConnection.State != ConnectionState.Open)
            {
                _underlyingConnection.Open();
            }
        });
    }
    public override string Database => _underlyingConnection.Database;
    public override string DataSource => _underlyingConnection.DataSource;
    public override string ServerVersion => _underlyingConnection.ServerVersion;
    public override ConnectionState State => _underlyingConnection.State;
    public override void ChangeDatabase(string databaseName) => _underlyingConnection.ChangeDatabase(databaseName);
    public override void Close() => _underlyingConnection.Close();
    protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => _underlyingConnection.BeginTransaction(isolationLevel);
    protected override DbCommand CreateDbCommand() => new ReliableSqlDbCommand(_underlyingConnection.CreateCommand(), _retryPolicy);
} 

因为,我们在需要时实例化 SqlConnection,我们还需要按照 Microsoft 的建议derived type dispose pattern 正确处理它:

protected override void Dispose(bool disposing)
{
    if (!_disposedValue)
    {
        if (disposing)
        {
            if (_underlyingConnection.State == ConnectionState.Open)
            {
                _underlyingConnection.Close();
            }
            _underlyingConnection.Dispose();
        }
        _disposedValue = true;
    }
    base.Dispose(disposing);
}

采用与 DbCommand 类似的方法:

public sealed class ReliableSqlDbCommand : DbCommand
{
    private readonly SqlCommand _underlyingSqlCommand;
    private readonly IRetryPolicy _retryPolicy;

    private bool _disposedValue;

    public ReliableSqlDbCommand(SqlCommand command, IRetryPolicy retryPolicy)
    {
        _underlyingSqlCommand = command;
        _retryPolicy = retryPolicy;
    }

    public override string CommandText
    {
        get => _underlyingSqlCommand.CommandText;
        set => _underlyingSqlCommand.CommandText = value;
    }

    public override int CommandTimeout
    {
        get => _underlyingSqlCommand.CommandTimeout;
        set => _underlyingSqlCommand.CommandTimeout = value;
    }

    public override CommandType CommandType
    {
        get => _underlyingSqlCommand.CommandType;
        set => _underlyingSqlCommand.CommandType = value;
    }

    public override bool DesignTimeVisible
    {
        get => _underlyingSqlCommand.DesignTimeVisible;
        set => _underlyingSqlCommand.DesignTimeVisible = value;
    }

    public override UpdateRowSource UpdatedRowSource
    {
        get => _underlyingSqlCommand.UpdatedRowSource;
        set => _underlyingSqlCommand.UpdatedRowSource = value;
    }

    protected override DbConnection DbConnection
    {
        get => _underlyingSqlCommand.Connection;
        set => _underlyingSqlCommand.Connection = (SqlConnection)value;
    }

    protected override DbParameterCollection DbParameterCollection => _underlyingSqlCommand.Parameters;

    protected override DbTransaction DbTransaction
    {
        get => _underlyingSqlCommand.Transaction;
        set => _underlyingSqlCommand.Transaction = (SqlTransaction)value;
    }

    public override void Cancel() => _underlyingSqlCommand.Cancel();

    public override int ExecuteNonQuery() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteNonQuery());

    public override object ExecuteScalar() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteScalar());

    public override void Prepare() => _retryPolicy.Execute(() => _underlyingSqlCommand.Prepare());

    protected override DbParameter CreateDbParameter() => _underlyingSqlCommand.CreateParameter();

    protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteReader(behavior));

    protected override void Dispose(bool disposing)
    {
        if (!_disposedValue)
        {
            if (disposing)
            {
                _underlyingSqlCommand.Dispose();
            }
            _disposedValue = true;
        }
        base.Dispose(disposing);
    }
}

现有的 DAL 端:

DI:

services.AddScoped<IRetryPolicy, RetryPolicy>();
services.Configure<DbConnectionOption>(options =>
{
    options.ConnectionString = connectionString;
});

延迟加载装饰器:

_connection = new Lazy<IDbConnection>(() =>
{
    return new ReliableSqlDbConnection(_dbOptions.ConnectionString, _retryPolicy);
});

Xunit 测试:该测试实际上在单个会话上创建死锁并重新连接它。

感谢@Martin Smith 提供的精彩脚本,更多关于脚本的信息:

[Fact]
public void It_creates_reliablesqldbConnection_and_deadlock_itself_to_log_and_retry()
{
    var logger = new FakeLogger<RetryPolicy>(); //create your own logger.
    using var reliableSqlDbConnection = new ReliableSqlDbConnection(_fixture.Configuration["ConnectionStrings:DataContext"],
                                                                    new RetryPolicy(logger)); //create your own fixture.

    //Awesome script which deadlocks itself on single con and process with it's meta data.                                                              
    Assert.ThrowsAsync<SqlException>(() => reliableSqlDbConnection.ExecuteAsync(
                                            @"BEGIN TRAN
                                            CREATE TYPE dbo.OptionIDs AS TABLE( OptionID INT PRIMARY KEY )
                                            EXEC ('DECLARE @OptionIDs dbo.OptionIDs;')
                                            ROLLBACK "));
                                            
    Assert.Equal(LogLevel.Warning, logger.Logs.Select(g => g.Key).First());

    var retries = logger.Logs[LogLevel.Warning].First();

    Assert.Equal(3, retries.Count());
    Assert.Equal("Transient DB Failure while executing query, error number: 1205; reattempt number: 1", retries.First());
}

总结: 有了这个,Open Connection、ExecuteReaderExecuteScalarExecuteNonQuery 等将具有围绕它们的重试功能,最终将由所有 Dapper 端点调用。

通过这种方式,代码更改将降至最低,无需触及 repos,只需启动即可。只需向 Sql 客户端的连接和命令提供 wrapper/decorator 即可使用自定义策略进行注入和重试。