为什么我会 运行 出 SQL 使用工作单元模式的连接?
Why would I run out of SQL connections using the unit of work pattern?
当我们将许多消息放在总线上,并且它们在我们的业务逻辑中调用一个流程时,就会发生此错误:
The timeout period elapsed prior to obtaining a connection from the
pool. This may have occurred because all pooled connections were in
use and max pool size was reached.
当有 15 条消息调用我们的流程时,这不会发生。但是,当调用 80 个或 130 个进程时确实会发生这种情况。
我们正在使用工作单元模式,连接在使用后关闭。所以我很难理解为什么它在下一个过程的池中不可用。
这是我们的应用程序中工作单元的使用方式:
using (var uow = _uowFactory.Create(true))
{
await uow.AccrualRepo.AddAccrualHistoriesAsync(histories);
await uow.CommitAsync();
}
这就是工厂 returns uow:
public class UnitOfWorkFactory : IUnitOfWorkFactory
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IDbConnection _connection;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
public UnitOfWorkFactory(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper, ILogger<RepoBase> logger)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_connection = sqlConnection;
_accrualMapper = accrualMapper;
_logger = logger;
}
public IUnitOfWork Create(bool useTransaction)
{
return new UnitOfWork(_configuration, _connection, _mediator, _stateAccessor, _timeProvider, _accrualMapper, _logger, useTransaction);
}
我们的 Startup.cs
文件以这种方式设置依赖注入:
services.AddTransient<IUnitOfWorkFactory, UnitOfWorkFactory>();
services.AddTransient<IDbConnection, SqlConnection>();
我现在这是很多代码,但我们的 uow 看起来像这样。请注意,连接在调用 CommitAsync()
和处理后关闭。
public class UnitOfWork : IUnitOfWork, IDisposable
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
private IDbConnection _connection;
private IDbTransaction _transaction;
private IAccrualRepo _accrualRepo;
private bool _disposed;
private bool _commitOccurred;
private bool _useTransaction;
public UnitOfWork(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper,
ILogger<RepoBase> logger, bool useTransaction = true)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_useTransaction = useTransaction;
_accrualMapper = accrualMapper;
_logger = logger;
_connection = sqlConnection;
_connection.ConnectionString = _configuration["ConnectionString"];
_connection.Open();
if (useTransaction)
{
_transaction = _connection.BeginTransaction();
}
}
public IAccrualRepo AccrualRepo
{
get => _accrualRepo ?? (_accrualRepo = new AccrualRepo(_configuration, _connection,
_transaction, _stateAccessor, _timeProvider, _mediator, _logger));
set => _accrualRepo = value;
}
public async Task CommitAsync()
{
if (!_useTransaction)
{
throw new InvalidOperationException("Attempting to call commit on a unit of work that isn't using a transaction");
}
try
{
_transaction.Commit();
_commitOccurred = true;
await InvokePostCommitOnReposAsync();
}
catch
{
_transaction.Rollback();
throw;
}
finally
{
_connection.Close();
_transaction.Dispose();
ResetRepositories();
}
}
private async Task InvokePostCommitOnReposAsync()
{
var repos = new List<RepoBase>();
if (_accrualRepo != null) { repos.Add((RepoBase)_accrualRepo); }
try
{
foreach (var repo in repos)
{
await repo.PostCommitAsync();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception occurred while invoking post commit on a repo.");
}
}
private void ResetRepositories()
{
_accrualRepo = null; // Note: there are more repos here, but removed for clarity.
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this); // Already disposed; no need for the GC to finalize.
}
protected virtual void Dispose(bool calledFromDisposeAndNotFromFinalizer)
{
if (_disposed) { return; }
if (calledFromDisposeAndNotFromFinalizer)
{
// If the user never called commit, but we are using a transaction, then roll back.
if (!_commitOccurred && _useTransaction && _transaction != null) { _transaction.Rollback(); }
if (_transaction != null) { _transaction.Dispose(); _transaction = null; }
if (_connection != null) { _connection.Dispose(); _connection = null; }
}
_disposed = true;
}
}
那么为什么我们会有这个连接池问题?这里有什么地方做错了吗?也许我们需要增加连接池大小?
连接池大小是允许的并发连接数。例如,SQL 服务器的默认值为 100。如果一次连接数超过该数量,他们必须等待之前的连接关闭。
如果您有很多消息可以一次进来,我建议增加连接池大小。
https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql-server-connection-pooling
但是,如果您注意到在 run-time 一定时间后收到此消息。可能是代码有问题,部分连接没有关闭。
当我们将许多消息放在总线上,并且它们在我们的业务逻辑中调用一个流程时,就会发生此错误:
The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use and max pool size was reached.
当有 15 条消息调用我们的流程时,这不会发生。但是,当调用 80 个或 130 个进程时确实会发生这种情况。
我们正在使用工作单元模式,连接在使用后关闭。所以我很难理解为什么它在下一个过程的池中不可用。
这是我们的应用程序中工作单元的使用方式:
using (var uow = _uowFactory.Create(true))
{
await uow.AccrualRepo.AddAccrualHistoriesAsync(histories);
await uow.CommitAsync();
}
这就是工厂 returns uow:
public class UnitOfWorkFactory : IUnitOfWorkFactory
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IDbConnection _connection;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
public UnitOfWorkFactory(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper, ILogger<RepoBase> logger)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_connection = sqlConnection;
_accrualMapper = accrualMapper;
_logger = logger;
}
public IUnitOfWork Create(bool useTransaction)
{
return new UnitOfWork(_configuration, _connection, _mediator, _stateAccessor, _timeProvider, _accrualMapper, _logger, useTransaction);
}
我们的 Startup.cs
文件以这种方式设置依赖注入:
services.AddTransient<IUnitOfWorkFactory, UnitOfWorkFactory>();
services.AddTransient<IDbConnection, SqlConnection>();
我现在这是很多代码,但我们的 uow 看起来像这样。请注意,连接在调用 CommitAsync()
和处理后关闭。
public class UnitOfWork : IUnitOfWork, IDisposable
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
private IDbConnection _connection;
private IDbTransaction _transaction;
private IAccrualRepo _accrualRepo;
private bool _disposed;
private bool _commitOccurred;
private bool _useTransaction;
public UnitOfWork(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper,
ILogger<RepoBase> logger, bool useTransaction = true)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_useTransaction = useTransaction;
_accrualMapper = accrualMapper;
_logger = logger;
_connection = sqlConnection;
_connection.ConnectionString = _configuration["ConnectionString"];
_connection.Open();
if (useTransaction)
{
_transaction = _connection.BeginTransaction();
}
}
public IAccrualRepo AccrualRepo
{
get => _accrualRepo ?? (_accrualRepo = new AccrualRepo(_configuration, _connection,
_transaction, _stateAccessor, _timeProvider, _mediator, _logger));
set => _accrualRepo = value;
}
public async Task CommitAsync()
{
if (!_useTransaction)
{
throw new InvalidOperationException("Attempting to call commit on a unit of work that isn't using a transaction");
}
try
{
_transaction.Commit();
_commitOccurred = true;
await InvokePostCommitOnReposAsync();
}
catch
{
_transaction.Rollback();
throw;
}
finally
{
_connection.Close();
_transaction.Dispose();
ResetRepositories();
}
}
private async Task InvokePostCommitOnReposAsync()
{
var repos = new List<RepoBase>();
if (_accrualRepo != null) { repos.Add((RepoBase)_accrualRepo); }
try
{
foreach (var repo in repos)
{
await repo.PostCommitAsync();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception occurred while invoking post commit on a repo.");
}
}
private void ResetRepositories()
{
_accrualRepo = null; // Note: there are more repos here, but removed for clarity.
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this); // Already disposed; no need for the GC to finalize.
}
protected virtual void Dispose(bool calledFromDisposeAndNotFromFinalizer)
{
if (_disposed) { return; }
if (calledFromDisposeAndNotFromFinalizer)
{
// If the user never called commit, but we are using a transaction, then roll back.
if (!_commitOccurred && _useTransaction && _transaction != null) { _transaction.Rollback(); }
if (_transaction != null) { _transaction.Dispose(); _transaction = null; }
if (_connection != null) { _connection.Dispose(); _connection = null; }
}
_disposed = true;
}
}
那么为什么我们会有这个连接池问题?这里有什么地方做错了吗?也许我们需要增加连接池大小?
连接池大小是允许的并发连接数。例如,SQL 服务器的默认值为 100。如果一次连接数超过该数量,他们必须等待之前的连接关闭。
如果您有很多消息可以一次进来,我建议增加连接池大小。
https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql-server-connection-pooling
但是,如果您注意到在 run-time 一定时间后收到此消息。可能是代码有问题,部分连接没有关闭。