使用 SemaphoreSlim 进行分批执行

Using SemaphoreSlim for divide execution for batches

我正在从零开始学习异步编程,需要解决一个问题。我正在开发的应用程序必须按 ID 循环下载数据(大约 800 次循环)。每个 Loop pass 从数据库中获取 10 到 500 行,并生成一个包含行的 txt 文件。我想异步执行此操作。当然不想同时生成 800 个报告(800 sql 查询)但想将其分成几批。我使用 SemaphoreSlim:

public async void Generate(DateTime BusinessDate)
{
var throttler = new SemaphoreSlim(5);
                var allTasks = new List<Task>();
                foreach (var id in idsToGenerate)
                {
                    await throttler.WaitAsync();
                    allTasks.Add(Task.Run(async () =>
                    {          
                            GenerateReportForIdAsync(id, BusinessDate);
                    }));
                    throttler.Release();
                }
                
                await Task.WhenAll(allTasks);
}

private Task GenerateReportForIdAsync(Id id, DateTime day)
        {
            return Task.Run(() => GenerateReportForOid(id, day));
        }


private void GenerateReportForId(Id id, DateTime day)
        {
            LogInformation(); // shortcut
            GetDataFromDB(); // shortcut
            CreateReportFromRecordFromDB(); // shortcut
            UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
        }

此代码有效,因此已生成报告,但对于最后 1-10 份报告(取决于执行),方法 UpdateInformationInDBThatReportHasBeenGenerated() 似乎尚未 运行。所以看起来应用程序在执行某些记录的 UpdateInformationInDBThatReportHasBeenGenerated() 之前完成。

你知道为什么吗?

更新 这是有效的:

public void Generate(DateTime BusinessDate)
{
      var taskAsync = GenerateAsync(BusinessDate);
            Task.WaitAll(taskAsync);
}

private async Task GenerateAsync(DateTime BusinessDate)
{
    var ab = new ActionBlock<(Id id, DateTime businessDate)>(
        GenerateReportForId,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
        }
    );
    
    foreach (var id in idsToGenerate)
    {
        ab.Post((id, BusinessDate));
    }
    
    ab.Complete();
    
    await ab.Completion;
}

private void GenerateReportForId((Id id, DateTime day) arg)
{
    LogInformation(); // shortcut
    GetDataFromDB(); // shortcut
    CreateReportFromRecordFromDB(); // shortcut
    UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}

您可以使用 ActionBlock<TInput> Class from Dataflow (Task Parallel Library) 轻松做到这一点:

public async Task Generate(DateTime BusinessDate)
{
    var ab = new ActionBlock<(Id id, DateTime businessDate)>(
        GenerateReportForId,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
        }
    );
    
    foreach (var id in idsToGenerate)
    {
        ab.Post((id, BusinessDate));
    }
    
    ab.Complete();
    
    await ab.Completion;
}

private void GenerateReportForId((Id id, DateTime day) arg)
{
    LogInformation(); // shortcut
    GetDataFromDB(); // shortcut
    CreateReportFromRecordFromDB(); // shortcut
    UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}