使用异步时未处理 SqlConnection

SqlConnection not being disposed when using async

我有一个项目有一个 Sql-Server 数据库后端和 Dapper 作为 ORM。我正在尝试使用 Dapper 的 QueryAsync() 方法来获取一些数据。不仅如此,对我的 repo 的调用来自几个用 Task.WhenAll 调用的任务(也就是说,每个任务都涉及从该 repo 获取数据,因此每个任务都等待我的 repo 的包装方法QueryAsync() 调用)。

问题是即使我使用 using 块,我的 Sql 连接也永远不会关闭。结果,我有 100 多个打开的数据库连接,并最终开始出现 "max pool size reached" 异常。问题是,当我切换到 Query() 而不是 QueryAsync() 时,它工作正常,但我希望能够异步执行此操作。

这是一个代码示例。我尽可能地模仿实际应用程序的结构,这就是为什么它看起来比实际情况更复杂的原因。

接口:

public interface IFooRepository<T> where T: FooBase
{
    Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}

实施:

public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
    private readonly IWebApiClientRepository _accountRepository;

    public FooRepository(IWebApiClientRepository repo)
    {
        _accountRepository = repo;
    }
    public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
    {
        effectiveDate = effectiveDate ?? DateTime.Today.Date;
        var referenceData =  await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
        using (var connection = new SqlConnection("iamaconnectionstring")
        {
            connection.Open();
            try
            {
                var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
                    new
                    {
                        effectiveDate = effectiveDate.Value,
                        code = referenceData.Code
                    });

                foreach (var item in res)
                {
                    item.PropFromReference = referenceData.PropFromReference;
                }
                return res;
            }
            catch (Exception e)
            {
                //log 
                throw;
            }
            finally
            {
                connection.Close();
            }
        }
    }
}

所以现在有了调用代码,有2层。我将从外部开始。我认为这就是问题所在。下方有评论

人口:

public class Populator : PopulatorBase
{
    private IAccountRepository _acctRepository;
    public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
    {
        //My attempt at throttling the async calls
        //I was hoping this would force a max of 10 simultaneous connections.
        //It did not work.
        SemaphoreSlim ss = new SemaphoreSlim(10,10);
        var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
        var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();

        List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
        foreach (var item in accountNumbers)
        {
            await ss.WaitAsync();
            trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
            ss.Release();
        }
        //my gut tells me the issue is because of these tasks
        var results = await Task.WhenAll(trackedTasks);
        return results;
    }

    private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
    {
        var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
        return Populate(accountCode, createdItems);
    }
}

物品创造者:

public class ItemCreator : ItemCreatorBase
{
    private readonly IFooRepository<FuturePosition> _fooRepository;
    private readonly IBarRepository<FuturePosition> _barRepository;

    public RussellGlobeOpFutureExceptionCreator() )
    {
        //standard constructor stuff
    }
    public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
    {
        DateTime reconDate = effectiveDate ?? DateTime.Today.Date;

        //this uses the repository I outlined above
        var foos = await _fooRepository.Select(account, effectiveDate);

        //this repository uses a rest client, I doubt it's the problem
        var bars = await _barRepository.Select(account, effectiveDate);

        //just trying to make this example less lengthy
        var foobars = MakeFoobars(foos, bars);
        var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
        return result;
    }
}

就我的尝试而言:

值得一提的是,填充器中的 foreach 循环运行了大约 500 次。本质上,有一个包含 500 个帐户的列表。对于每一个,它都需要执行一项很长的 运行 populate 任务,其中涉及从我的 Foo 存储库中提取数据。

老实说我不知道​​。我认为这可能与等待来自填充器任务列表中每个任务的异步数据库调用有关。对此问题的任何见解都会非常有帮助。

经过一番挖掘,我想我设法找出了问题所在。我认为我实际上并没有像我最初假设的那样遇到连接泄漏。根据我现在的理解,使用连接池时,当 SQL 连接从代码中关闭时,它实际上并没有消失——它只是作为空闲连接进入连接池。查看 SQL 中打开的连接仍会显示它。

由于我的数据访问是异步的,所有连接都在任何 "closed" 连接返回池之前打开,这意味着每个请求都会打开一个新连接。这导致了我看到的惊人数量的打开连接,让我假设我有连接泄漏。

实际上使用 SemaphoreSlim 解决了这个问题——我只是错误地实现了它。它应该像这样工作:

public override async Task<IEnumerable<ProcessResult>> ProcessAccount(DateTime? popDate = null)
{
      foreach (item in accountNumbers)
      {

      trackedTasks.Add(new Func<Task<ProcessResult>>(async () =>
            {
                await ss.WaitAsync().ConfigureAwait(false);
                try
                {
                    return await ProcessAccount(item.AccountCode, popDate ?? DateTime.Today).ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    //log, etc.
                }
                finally
                {
                    ss.Release();
                }
            })());
      }
}

这样做会限制一次打开的连接数量,并等待它们关闭,因此池中相同的较小连接组将被重新使用。