为什么我会 运行 出 SQL 使用工作单元模式的连接?

Why would I run out of SQL connections using the unit of work pattern?

当我们将许多消息放在总线上,并且它们在我们的业务逻辑中调用一个流程时,就会发生此错误:

The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use and max pool size was reached.

当有 15 条消息调用我们的流程时,这不会发生。但是,当调用 80 个或 130 个进程时确实会发生这种情况。

我们正在使用工作单元模式,连接在使用后关闭。所以我很难理解为什么它在下一个过程的池中不可用。

这是我们的应用程序中工作单元的使用方式:

using (var uow = _uowFactory.Create(true))
{
    await uow.AccrualRepo.AddAccrualHistoriesAsync(histories);
    await uow.CommitAsync();
}

这就是工厂 returns uow:

public class UnitOfWorkFactory : IUnitOfWorkFactory
{
    private readonly IConfiguration _configuration;
    private readonly IMediator _mediator;
    private readonly IStateAccessor _stateAccessor;
    private readonly ITimeProvider _timeProvider;
    private readonly IDbConnection _connection;
    private readonly IAccrualMapper _accrualMapper;
    private readonly ILogger<RepoBase> _logger;

    public UnitOfWorkFactory(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
        IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper, ILogger<RepoBase> logger)
    {
        _configuration = configuration;
        _mediator = mediator;
        _stateAccessor = stateAccessor;
        _timeProvider = timeProvider;
        _connection = sqlConnection;
        _accrualMapper = accrualMapper;
        _logger = logger;
    }

    public IUnitOfWork Create(bool useTransaction)
    {
        return new UnitOfWork(_configuration, _connection, _mediator, _stateAccessor, _timeProvider, _accrualMapper, _logger, useTransaction);
    }

我们的 Startup.cs 文件以这种方式设置依赖注入:

services.AddTransient<IUnitOfWorkFactory, UnitOfWorkFactory>();
services.AddTransient<IDbConnection, SqlConnection>();

我现在这是很多代码,但我们的 uow 看起来像这样。请注意,连接在调用 CommitAsync() 和处理后关闭。

public class UnitOfWork : IUnitOfWork, IDisposable
{
    private readonly IConfiguration _configuration;
    private readonly IMediator _mediator;
    private readonly IStateAccessor _stateAccessor;
    private readonly ITimeProvider _timeProvider;
    private readonly IAccrualMapper _accrualMapper;
    private readonly ILogger<RepoBase> _logger;
    private IDbConnection _connection;
    private IDbTransaction _transaction;
    private IAccrualRepo _accrualRepo;
    private bool _disposed;
    private bool _commitOccurred;
    private bool _useTransaction;

    public UnitOfWork(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator, 
        IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper,
        ILogger<RepoBase> logger, bool useTransaction = true)
    {
        _configuration = configuration;
        _mediator = mediator;
        _stateAccessor = stateAccessor;
        _timeProvider = timeProvider;
        _useTransaction = useTransaction;
        _accrualMapper = accrualMapper;
        _logger = logger;
        _connection = sqlConnection;
        _connection.ConnectionString = _configuration["ConnectionString"];
        _connection.Open();

        if (useTransaction)
        {
            _transaction = _connection.BeginTransaction();
        }
    }

    public IAccrualRepo AccrualRepo
    {
        get => _accrualRepo ?? (_accrualRepo = new AccrualRepo(_configuration, _connection,
            _transaction, _stateAccessor, _timeProvider, _mediator, _logger));
        set => _accrualRepo = value;
    }

    public async Task CommitAsync()
    {
        if (!_useTransaction)
        {
            throw new InvalidOperationException("Attempting to call commit on a unit of work that isn't using a transaction");
        }

        try
        {
            _transaction.Commit();
            _commitOccurred = true;
            await InvokePostCommitOnReposAsync();
        }
        catch
        {
            _transaction.Rollback();
            throw;
        }
        finally
        {
            _connection.Close();
            _transaction.Dispose();
            ResetRepositories();
        }
    }

    private async Task InvokePostCommitOnReposAsync()
    {
        var repos = new List<RepoBase>();
        if (_accrualRepo != null) { repos.Add((RepoBase)_accrualRepo); }

        try
        {
            foreach (var repo in repos)
            {
                await repo.PostCommitAsync();
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Exception occurred while invoking post commit on a repo.");
        }
    }

    private void ResetRepositories()
    {
        _accrualRepo = null; // Note: there are more repos here, but removed for clarity.
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this); // Already disposed; no need for the GC to finalize.
    }

    protected virtual void Dispose(bool calledFromDisposeAndNotFromFinalizer)
    {
        if (_disposed) { return; }

        if (calledFromDisposeAndNotFromFinalizer)
        {
            // If the user never called commit, but we are using a transaction, then roll back.
            if (!_commitOccurred && _useTransaction && _transaction != null) { _transaction.Rollback(); }

            if (_transaction != null) { _transaction.Dispose(); _transaction = null; }
            if (_connection != null) { _connection.Dispose(); _connection = null; }
        }

        _disposed = true;
    }
}

那么为什么我们会有这个连接池问题?这里有什么地方做错了吗?也许我们需要增加连接池大小?

连接池大小是允许的并发连接数。例如,SQL 服务器的默认值为 100。如果一次连接数超过该数量,他们必须等待之前的连接关闭。

如果您有很多消息可以一次进来,我建议增加连接池大小。

https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql-server-connection-pooling

但是,如果您注意到在 run-time 一定时间后收到此消息。可能是代码有问题,部分连接没有关闭。