需要一种内置的方式来为 Dapper 添加死锁弹性,而不改变现有的回购协议
Need a built in way to add Deadlock Resilience to Dapper for existing repos without altering them
需要使所有现有的存储库(大约 30+)容错死锁并使用日志和等待方法从死锁中恢复。
尝试成功:经过一些研究并根据项目对其进行了定制,我在下面回答了使用 Polly 的自定义 SqlResiliencyPolicy。
但是,我寻求的是:目前的方式(PFB 回答),要求我要么
- 用
await _policy.ExecuteAsync
或 包装所有现有的数据库调用
- 提供接受
IAsyncPolicy
参数的自定义重载。然后调用预期的方法。 IDbConnection 的扩展类型:
public static Task<T> GetAsync<T>(this IDbConnection connection, object primaryKey, IAsyncPolicy policy) =>
return await _policy.ExecuteAsync(async () => GetAsync<T> (...));
在这两种方式中,我都需要更改所有 30 多个回购协议。但是,在 dapper/some-other-approaches 中是否有内置的方法,我们可以在其中
"configure a Policy in startup and auto-magically all DB calls via
dapper become resilient (fall backs to their fault tolerant mechanism)
Similar to the ways of http clients resilience where policy is added
while you register a client"
由此:将代码更改降至最低,无需接触 repos,只需启动即可。
我有以下方法,需要对其进行改进。
经过一些研究后我的做法:
public class SqlResiliencyPolicy
{
private readonly ISet<int> transientDbErrors = new HashSet<int>(new[] { 1205 });
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencyPolicy(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
public IAsyncPolicy GetSqlResiliencyPolicy(int transientErrorRetries = 3)
{
return Policy
.Handle<SqlException>(ex => transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
(exception, sleepTime, reattempt, context) =>
{
_logger.Log(LogLevel.Error, exception, $"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattempt}");
});
}
}
启动中:
services.AddScoped(_ => new SqlResiliencyPolicy(_logger, _configuration).GetSqlResiliencyPolicy());
Ctor DI:在现有的 Repos DI 到 Ctor 中,带有私有 IAsyncPolicy 支持字段:
private readonly IAsyncPolicy _policy;
最后一步:用
包装所有小巧的调用
await _policy.ExecuteAsync(async () => {<existing DB call>});
已实施第二种方法 ^^:这将要对现有存储库进行 DI 的策略解耦。 IDbConnection
的扩展方法负责围绕现有方法包装策略。
public class SqlResiliencePolicyFactory
{
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencePolicyFactory(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
public IPolicyRegistry<string> GetSqlResiliencePolicies(int transientErrorRetries = 3)
{
return new PolicyRegistry
{
{
"DbDeadLockResilience",
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
},
{
"DbDeadLockResilienceAsync",
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
}
};
}
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.Log(
LogLevel.Warning,
exception,
@$"Transient DB Failure while executing query,
error number: {((SqlException)exception).Number};
reattempt number: {reattemptCount}");
}
启动中:
DapperExtensions.SetPolicies(new SqlResiliencePolicyFactory(_logger, _configuration)
.GetSqlResiliencePolicies());
在单独的 class 中创建扩展方法,以围绕您的存储库的现有方法包装策略。
扩展方法:
public static class DapperExtensions
{
private static Policy _policy = Policy.NoOp();
private static IAsyncPolicy _asyncPolicy = Policy.NoOpAsync();
public static void SetPolicies(IReadOnlyPolicyRegistry<string> readOnlyPolicyRegistry)
{
_policy = readOnlyPolicyRegistry.Get<Policy>("DbDeadLockResilience");
_asyncPolicy = readOnlyPolicyRegistry.Get<IAsyncPolicy>("DbDeadLockResilienceAsync");
}
public static T GetFirstWithRetry<T>(this IDbConnection connection,
string? sql = null, object? parameters = null, IDbTransaction? transaction = null) where T : class =>
_policy.Execute(() => connection.GetFirst<T>(sql, parameters, transaction));
public static T QueryFirstOrDefaultWithRetry<T>(this IDbConnection connection, string sql,
object? parameters = null, IDbTransaction? transaction = null) =>
_policy.Execute(() => connection.QueryFirstOrDefault<T>(sql, parameters, transaction));
public static async Task<bool> UpdateAsyncWithRetry<T>(this IDbConnection connection, T entityToUpdate, IEnumerable<string> columnsToUpdate,
IDbTransaction? transaction = null) where T : class =>
await _asyncPolicy.ExecuteAsync(async () => await connection.UpdateAsync(entityToUpdate, columnsToUpdate, transaction));
//Similarly, add overloads to all the other methods in existing repo.
}
现在,
- 现有的回购独立于政策(没有 DI 回购)。
- 政策按照 SRP 保存在单独的地方。
- Dapper 扩展可以更改策略以便于测试。
因此,现有的 repos 必须更改名称并调用上述包装器而不是调用 dapper 方法本身,将应用策略。不要忘记对回购进行一次回归测试。
以下是迄今为止对现有存储库进行 minimal/no 更改的恰当方法。感谢@Sergey Akopov 撰写的博客以及指向此博客的我的同事。
简短回答:使用装饰器模式包装 SQL Client's Connection and Command instances
并将 Polly 的重试策略注入这些装饰器。通过这种方式,将能够使用重试策略包装所有 SQL 执行端点。这将与 Dapper 兼容,因为它是 IDbConnection
.
的扩展
创建一个 DI'able Retry 策略,将策略封装在其中。此外,我们可以完全解耦策略以分离 class 并为 DI 注册它(此答案中未显示,但其他答案中遵循这一点,如果您有更多,请不要忘记使用 PolicyRegister
不止一项政策)。
Git 回购:https://github.com/VinZCodz/SqlTransientFaultHandling
详情:
策略接口,没有异步方法,因为 Microsoft.Data.SqlClient
端点不是异步的。
public interface IRetryPolicy
{
void Execute(Action operation);
TResult Execute<TResult>(Func<TResult> operation);
}
具体实现,将策略嵌入其中,并通过 Sql Client 和 Dapper 包装所有 DB 调用的重试逻辑。
public class RetryPolicy : IRetryPolicy
{
private readonly ILogger<RetryPolicy> _logger;
private readonly Policy _retryPolicy;
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
private const int _transientErrorRetries = 3;
public RetryPolicy(ILogger<RetryPolicy> logger)
{
_logger = logger;
_retryPolicy = Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: _transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction);
}
public void Execute(Action operation) => _retryPolicy.Execute(operation.Invoke);
public TResult Execute<TResult>(Func<TResult> operation) => _retryPolicy.Execute(() => operation.Invoke());
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.LogWarning(
exception,
$"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattemptCount}");
}
现在,我们需要以某种方式将此策略注入 SqlClient 的 con 和 cmd,需要 sealed
class 其中 'is-a' DbConnection
(DAL 端点将保持不变)还有'has-a' DbConnection
(模仿操作但重试):
public sealed class ReliableSqlDbConnection : DbConnection
{
private readonly SqlConnection _underlyingConnection;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
private string _connectionString;
public ReliableSqlDbConnection(string connectionString, IRetryPolicy retryPolicy)
{
_connectionString = connectionString;
_retryPolicy = retryPolicy;
_underlyingConnection = new SqlConnection(connectionString);
}
public override string ConnectionString
{
get => _connectionString;
set => _underlyingConnection.ConnectionString = _connectionString = value;
}
public override void Open()
{
_retryPolicy.Execute(() =>
{
if (_underlyingConnection.State != ConnectionState.Open)
{
_underlyingConnection.Open();
}
});
}
public override string Database => _underlyingConnection.Database;
public override string DataSource => _underlyingConnection.DataSource;
public override string ServerVersion => _underlyingConnection.ServerVersion;
public override ConnectionState State => _underlyingConnection.State;
public override void ChangeDatabase(string databaseName) => _underlyingConnection.ChangeDatabase(databaseName);
public override void Close() => _underlyingConnection.Close();
protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => _underlyingConnection.BeginTransaction(isolationLevel);
protected override DbCommand CreateDbCommand() => new ReliableSqlDbCommand(_underlyingConnection.CreateCommand(), _retryPolicy);
}
因为,我们在需要时实例化 SqlConnection,我们还需要按照 Microsoft 的建议derived type dispose pattern
正确处理它:
protected override void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
if (_underlyingConnection.State == ConnectionState.Open)
{
_underlyingConnection.Close();
}
_underlyingConnection.Dispose();
}
_disposedValue = true;
}
base.Dispose(disposing);
}
采用与 DbCommand
类似的方法:
public sealed class ReliableSqlDbCommand : DbCommand
{
private readonly SqlCommand _underlyingSqlCommand;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
public ReliableSqlDbCommand(SqlCommand command, IRetryPolicy retryPolicy)
{
_underlyingSqlCommand = command;
_retryPolicy = retryPolicy;
}
public override string CommandText
{
get => _underlyingSqlCommand.CommandText;
set => _underlyingSqlCommand.CommandText = value;
}
public override int CommandTimeout
{
get => _underlyingSqlCommand.CommandTimeout;
set => _underlyingSqlCommand.CommandTimeout = value;
}
public override CommandType CommandType
{
get => _underlyingSqlCommand.CommandType;
set => _underlyingSqlCommand.CommandType = value;
}
public override bool DesignTimeVisible
{
get => _underlyingSqlCommand.DesignTimeVisible;
set => _underlyingSqlCommand.DesignTimeVisible = value;
}
public override UpdateRowSource UpdatedRowSource
{
get => _underlyingSqlCommand.UpdatedRowSource;
set => _underlyingSqlCommand.UpdatedRowSource = value;
}
protected override DbConnection DbConnection
{
get => _underlyingSqlCommand.Connection;
set => _underlyingSqlCommand.Connection = (SqlConnection)value;
}
protected override DbParameterCollection DbParameterCollection => _underlyingSqlCommand.Parameters;
protected override DbTransaction DbTransaction
{
get => _underlyingSqlCommand.Transaction;
set => _underlyingSqlCommand.Transaction = (SqlTransaction)value;
}
public override void Cancel() => _underlyingSqlCommand.Cancel();
public override int ExecuteNonQuery() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteNonQuery());
public override object ExecuteScalar() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteScalar());
public override void Prepare() => _retryPolicy.Execute(() => _underlyingSqlCommand.Prepare());
protected override DbParameter CreateDbParameter() => _underlyingSqlCommand.CreateParameter();
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteReader(behavior));
protected override void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
_underlyingSqlCommand.Dispose();
}
_disposedValue = true;
}
base.Dispose(disposing);
}
}
现有的 DAL 端:
DI:
services.AddScoped<IRetryPolicy, RetryPolicy>();
services.Configure<DbConnectionOption>(options =>
{
options.ConnectionString = connectionString;
});
延迟加载装饰器:
_connection = new Lazy<IDbConnection>(() =>
{
return new ReliableSqlDbConnection(_dbOptions.ConnectionString, _retryPolicy);
});
Xunit 测试:该测试实际上在单个会话上创建死锁并重新连接它。
感谢@Martin Smith 提供的精彩脚本,更多关于脚本的信息:
[Fact]
public void It_creates_reliablesqldbConnection_and_deadlock_itself_to_log_and_retry()
{
var logger = new FakeLogger<RetryPolicy>(); //create your own logger.
using var reliableSqlDbConnection = new ReliableSqlDbConnection(_fixture.Configuration["ConnectionStrings:DataContext"],
new RetryPolicy(logger)); //create your own fixture.
//Awesome script which deadlocks itself on single con and process with it's meta data.
Assert.ThrowsAsync<SqlException>(() => reliableSqlDbConnection.ExecuteAsync(
@"BEGIN TRAN
CREATE TYPE dbo.OptionIDs AS TABLE( OptionID INT PRIMARY KEY )
EXEC ('DECLARE @OptionIDs dbo.OptionIDs;')
ROLLBACK "));
Assert.Equal(LogLevel.Warning, logger.Logs.Select(g => g.Key).First());
var retries = logger.Logs[LogLevel.Warning].First();
Assert.Equal(3, retries.Count());
Assert.Equal("Transient DB Failure while executing query, error number: 1205; reattempt number: 1", retries.First());
}
总结:
有了这个,Open
Connection、ExecuteReader
、ExecuteScalar
、ExecuteNonQuery
等将具有围绕它们的重试功能,最终将由所有 Dapper 端点调用。
通过这种方式,代码更改将降至最低,无需触及 repos,只需启动即可。只需向 Sql 客户端的连接和命令提供 wrapper/decorator 即可使用自定义策略进行注入和重试。
需要使所有现有的存储库(大约 30+)容错死锁并使用日志和等待方法从死锁中恢复。
尝试成功:经过一些研究并根据项目对其进行了定制,我在下面回答了使用 Polly 的自定义 SqlResiliencyPolicy。
但是,我寻求的是:目前的方式(PFB 回答),要求我要么
- 用
await _policy.ExecuteAsync
或 包装所有现有的数据库调用
- 提供接受
IAsyncPolicy
参数的自定义重载。然后调用预期的方法。 IDbConnection 的扩展类型:
public static Task<T> GetAsync<T>(this IDbConnection connection, object primaryKey, IAsyncPolicy policy) =>
return await _policy.ExecuteAsync(async () => GetAsync<T> (...));
在这两种方式中,我都需要更改所有 30 多个回购协议。但是,在 dapper/some-other-approaches 中是否有内置的方法,我们可以在其中
"configure a Policy in startup and auto-magically all DB calls via dapper become resilient (fall backs to their fault tolerant mechanism) Similar to the ways of http clients resilience where policy is added while you register a client"
由此:将代码更改降至最低,无需接触 repos,只需启动即可。
我有以下方法,需要对其进行改进。
经过一些研究后我的做法:
public class SqlResiliencyPolicy
{
private readonly ISet<int> transientDbErrors = new HashSet<int>(new[] { 1205 });
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencyPolicy(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
public IAsyncPolicy GetSqlResiliencyPolicy(int transientErrorRetries = 3)
{
return Policy
.Handle<SqlException>(ex => transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
(exception, sleepTime, reattempt, context) =>
{
_logger.Log(LogLevel.Error, exception, $"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattempt}");
});
}
}
启动中:
services.AddScoped(_ => new SqlResiliencyPolicy(_logger, _configuration).GetSqlResiliencyPolicy());
Ctor DI:在现有的 Repos DI 到 Ctor 中,带有私有 IAsyncPolicy 支持字段:
private readonly IAsyncPolicy _policy;
最后一步:用
包装所有小巧的调用await _policy.ExecuteAsync(async () => {<existing DB call>});
已实施第二种方法 ^^:这将要对现有存储库进行 DI 的策略解耦。 IDbConnection
的扩展方法负责围绕现有方法包装策略。
public class SqlResiliencePolicyFactory
{
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencePolicyFactory(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
public IPolicyRegistry<string> GetSqlResiliencePolicies(int transientErrorRetries = 3)
{
return new PolicyRegistry
{
{
"DbDeadLockResilience",
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
},
{
"DbDeadLockResilienceAsync",
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
}
};
}
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.Log(
LogLevel.Warning,
exception,
@$"Transient DB Failure while executing query,
error number: {((SqlException)exception).Number};
reattempt number: {reattemptCount}");
}
启动中:
DapperExtensions.SetPolicies(new SqlResiliencePolicyFactory(_logger, _configuration)
.GetSqlResiliencePolicies());
在单独的 class 中创建扩展方法,以围绕您的存储库的现有方法包装策略。 扩展方法:
public static class DapperExtensions
{
private static Policy _policy = Policy.NoOp();
private static IAsyncPolicy _asyncPolicy = Policy.NoOpAsync();
public static void SetPolicies(IReadOnlyPolicyRegistry<string> readOnlyPolicyRegistry)
{
_policy = readOnlyPolicyRegistry.Get<Policy>("DbDeadLockResilience");
_asyncPolicy = readOnlyPolicyRegistry.Get<IAsyncPolicy>("DbDeadLockResilienceAsync");
}
public static T GetFirstWithRetry<T>(this IDbConnection connection,
string? sql = null, object? parameters = null, IDbTransaction? transaction = null) where T : class =>
_policy.Execute(() => connection.GetFirst<T>(sql, parameters, transaction));
public static T QueryFirstOrDefaultWithRetry<T>(this IDbConnection connection, string sql,
object? parameters = null, IDbTransaction? transaction = null) =>
_policy.Execute(() => connection.QueryFirstOrDefault<T>(sql, parameters, transaction));
public static async Task<bool> UpdateAsyncWithRetry<T>(this IDbConnection connection, T entityToUpdate, IEnumerable<string> columnsToUpdate,
IDbTransaction? transaction = null) where T : class =>
await _asyncPolicy.ExecuteAsync(async () => await connection.UpdateAsync(entityToUpdate, columnsToUpdate, transaction));
//Similarly, add overloads to all the other methods in existing repo.
}
现在,
- 现有的回购独立于政策(没有 DI 回购)。
- 政策按照 SRP 保存在单独的地方。
- Dapper 扩展可以更改策略以便于测试。
因此,现有的 repos 必须更改名称并调用上述包装器而不是调用 dapper 方法本身,将应用策略。不要忘记对回购进行一次回归测试。
以下是迄今为止对现有存储库进行 minimal/no 更改的恰当方法。感谢@Sergey Akopov 撰写的博客以及指向此博客的我的同事。
简短回答:使用装饰器模式包装 SQL Client's Connection and Command instances
并将 Polly 的重试策略注入这些装饰器。通过这种方式,将能够使用重试策略包装所有 SQL 执行端点。这将与 Dapper 兼容,因为它是 IDbConnection
.
创建一个 DI'able Retry 策略,将策略封装在其中。此外,我们可以完全解耦策略以分离 class 并为 DI 注册它(此答案中未显示,但其他答案中遵循这一点,如果您有更多,请不要忘记使用 PolicyRegister
不止一项政策)。
Git 回购:https://github.com/VinZCodz/SqlTransientFaultHandling
详情:
策略接口,没有异步方法,因为 Microsoft.Data.SqlClient
端点不是异步的。
public interface IRetryPolicy
{
void Execute(Action operation);
TResult Execute<TResult>(Func<TResult> operation);
}
具体实现,将策略嵌入其中,并通过 Sql Client 和 Dapper 包装所有 DB 调用的重试逻辑。
public class RetryPolicy : IRetryPolicy
{
private readonly ILogger<RetryPolicy> _logger;
private readonly Policy _retryPolicy;
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
private const int _transientErrorRetries = 3;
public RetryPolicy(ILogger<RetryPolicy> logger)
{
_logger = logger;
_retryPolicy = Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: _transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction);
}
public void Execute(Action operation) => _retryPolicy.Execute(operation.Invoke);
public TResult Execute<TResult>(Func<TResult> operation) => _retryPolicy.Execute(() => operation.Invoke());
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.LogWarning(
exception,
$"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattemptCount}");
}
现在,我们需要以某种方式将此策略注入 SqlClient 的 con 和 cmd,需要 sealed
class 其中 'is-a' DbConnection
(DAL 端点将保持不变)还有'has-a' DbConnection
(模仿操作但重试):
public sealed class ReliableSqlDbConnection : DbConnection
{
private readonly SqlConnection _underlyingConnection;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
private string _connectionString;
public ReliableSqlDbConnection(string connectionString, IRetryPolicy retryPolicy)
{
_connectionString = connectionString;
_retryPolicy = retryPolicy;
_underlyingConnection = new SqlConnection(connectionString);
}
public override string ConnectionString
{
get => _connectionString;
set => _underlyingConnection.ConnectionString = _connectionString = value;
}
public override void Open()
{
_retryPolicy.Execute(() =>
{
if (_underlyingConnection.State != ConnectionState.Open)
{
_underlyingConnection.Open();
}
});
}
public override string Database => _underlyingConnection.Database;
public override string DataSource => _underlyingConnection.DataSource;
public override string ServerVersion => _underlyingConnection.ServerVersion;
public override ConnectionState State => _underlyingConnection.State;
public override void ChangeDatabase(string databaseName) => _underlyingConnection.ChangeDatabase(databaseName);
public override void Close() => _underlyingConnection.Close();
protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => _underlyingConnection.BeginTransaction(isolationLevel);
protected override DbCommand CreateDbCommand() => new ReliableSqlDbCommand(_underlyingConnection.CreateCommand(), _retryPolicy);
}
因为,我们在需要时实例化 SqlConnection,我们还需要按照 Microsoft 的建议derived type dispose pattern
正确处理它:
protected override void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
if (_underlyingConnection.State == ConnectionState.Open)
{
_underlyingConnection.Close();
}
_underlyingConnection.Dispose();
}
_disposedValue = true;
}
base.Dispose(disposing);
}
采用与 DbCommand
类似的方法:
public sealed class ReliableSqlDbCommand : DbCommand
{
private readonly SqlCommand _underlyingSqlCommand;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
public ReliableSqlDbCommand(SqlCommand command, IRetryPolicy retryPolicy)
{
_underlyingSqlCommand = command;
_retryPolicy = retryPolicy;
}
public override string CommandText
{
get => _underlyingSqlCommand.CommandText;
set => _underlyingSqlCommand.CommandText = value;
}
public override int CommandTimeout
{
get => _underlyingSqlCommand.CommandTimeout;
set => _underlyingSqlCommand.CommandTimeout = value;
}
public override CommandType CommandType
{
get => _underlyingSqlCommand.CommandType;
set => _underlyingSqlCommand.CommandType = value;
}
public override bool DesignTimeVisible
{
get => _underlyingSqlCommand.DesignTimeVisible;
set => _underlyingSqlCommand.DesignTimeVisible = value;
}
public override UpdateRowSource UpdatedRowSource
{
get => _underlyingSqlCommand.UpdatedRowSource;
set => _underlyingSqlCommand.UpdatedRowSource = value;
}
protected override DbConnection DbConnection
{
get => _underlyingSqlCommand.Connection;
set => _underlyingSqlCommand.Connection = (SqlConnection)value;
}
protected override DbParameterCollection DbParameterCollection => _underlyingSqlCommand.Parameters;
protected override DbTransaction DbTransaction
{
get => _underlyingSqlCommand.Transaction;
set => _underlyingSqlCommand.Transaction = (SqlTransaction)value;
}
public override void Cancel() => _underlyingSqlCommand.Cancel();
public override int ExecuteNonQuery() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteNonQuery());
public override object ExecuteScalar() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteScalar());
public override void Prepare() => _retryPolicy.Execute(() => _underlyingSqlCommand.Prepare());
protected override DbParameter CreateDbParameter() => _underlyingSqlCommand.CreateParameter();
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteReader(behavior));
protected override void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
_underlyingSqlCommand.Dispose();
}
_disposedValue = true;
}
base.Dispose(disposing);
}
}
现有的 DAL 端:
DI:
services.AddScoped<IRetryPolicy, RetryPolicy>();
services.Configure<DbConnectionOption>(options =>
{
options.ConnectionString = connectionString;
});
延迟加载装饰器:
_connection = new Lazy<IDbConnection>(() =>
{
return new ReliableSqlDbConnection(_dbOptions.ConnectionString, _retryPolicy);
});
Xunit 测试:该测试实际上在单个会话上创建死锁并重新连接它。
感谢@Martin Smith 提供的精彩脚本,更多关于脚本的信息:
[Fact]
public void It_creates_reliablesqldbConnection_and_deadlock_itself_to_log_and_retry()
{
var logger = new FakeLogger<RetryPolicy>(); //create your own logger.
using var reliableSqlDbConnection = new ReliableSqlDbConnection(_fixture.Configuration["ConnectionStrings:DataContext"],
new RetryPolicy(logger)); //create your own fixture.
//Awesome script which deadlocks itself on single con and process with it's meta data.
Assert.ThrowsAsync<SqlException>(() => reliableSqlDbConnection.ExecuteAsync(
@"BEGIN TRAN
CREATE TYPE dbo.OptionIDs AS TABLE( OptionID INT PRIMARY KEY )
EXEC ('DECLARE @OptionIDs dbo.OptionIDs;')
ROLLBACK "));
Assert.Equal(LogLevel.Warning, logger.Logs.Select(g => g.Key).First());
var retries = logger.Logs[LogLevel.Warning].First();
Assert.Equal(3, retries.Count());
Assert.Equal("Transient DB Failure while executing query, error number: 1205; reattempt number: 1", retries.First());
}
总结:
有了这个,Open
Connection、ExecuteReader
、ExecuteScalar
、ExecuteNonQuery
等将具有围绕它们的重试功能,最终将由所有 Dapper 端点调用。
通过这种方式,代码更改将降至最低,无需触及 repos,只需启动即可。只需向 Sql 客户端的连接和命令提供 wrapper/decorator 即可使用自定义策略进行注入和重试。