使用 SemaphoreSlim 进行分批执行
Using SemaphoreSlim for divide execution for batches
我正在从零开始学习异步编程,需要解决一个问题。我正在开发的应用程序必须按 ID 循环下载数据(大约 800 次循环)。每个 Loop pass 从数据库中获取 10 到 500 行,并生成一个包含行的 txt 文件。我想异步执行此操作。当然不想同时生成 800 个报告(800 sql 查询)但想将其分成几批。我使用 SemaphoreSlim
:
public async void Generate(DateTime BusinessDate)
{
var throttler = new SemaphoreSlim(5);
var allTasks = new List<Task>();
foreach (var id in idsToGenerate)
{
await throttler.WaitAsync();
allTasks.Add(Task.Run(async () =>
{
GenerateReportForIdAsync(id, BusinessDate);
}));
throttler.Release();
}
await Task.WhenAll(allTasks);
}
private Task GenerateReportForIdAsync(Id id, DateTime day)
{
return Task.Run(() => GenerateReportForOid(id, day));
}
private void GenerateReportForId(Id id, DateTime day)
{
LogInformation(); // shortcut
GetDataFromDB(); // shortcut
CreateReportFromRecordFromDB(); // shortcut
UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}
此代码有效,因此已生成报告,但对于最后 1-10 份报告(取决于执行),方法 UpdateInformationInDBThatReportHasBeenGenerated()
似乎尚未 运行。所以看起来应用程序在执行某些记录的 UpdateInformationInDBThatReportHasBeenGenerated()
之前完成。
你知道为什么吗?
更新
这是有效的:
public void Generate(DateTime BusinessDate)
{
var taskAsync = GenerateAsync(BusinessDate);
Task.WaitAll(taskAsync);
}
private async Task GenerateAsync(DateTime BusinessDate)
{
var ab = new ActionBlock<(Id id, DateTime businessDate)>(
GenerateReportForId,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
}
);
foreach (var id in idsToGenerate)
{
ab.Post((id, BusinessDate));
}
ab.Complete();
await ab.Completion;
}
private void GenerateReportForId((Id id, DateTime day) arg)
{
LogInformation(); // shortcut
GetDataFromDB(); // shortcut
CreateReportFromRecordFromDB(); // shortcut
UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}
您可以使用 ActionBlock<TInput> Class from Dataflow (Task Parallel Library) 轻松做到这一点:
public async Task Generate(DateTime BusinessDate)
{
var ab = new ActionBlock<(Id id, DateTime businessDate)>(
GenerateReportForId,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
}
);
foreach (var id in idsToGenerate)
{
ab.Post((id, BusinessDate));
}
ab.Complete();
await ab.Completion;
}
private void GenerateReportForId((Id id, DateTime day) arg)
{
LogInformation(); // shortcut
GetDataFromDB(); // shortcut
CreateReportFromRecordFromDB(); // shortcut
UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}
我正在从零开始学习异步编程,需要解决一个问题。我正在开发的应用程序必须按 ID 循环下载数据(大约 800 次循环)。每个 Loop pass 从数据库中获取 10 到 500 行,并生成一个包含行的 txt 文件。我想异步执行此操作。当然不想同时生成 800 个报告(800 sql 查询)但想将其分成几批。我使用 SemaphoreSlim
:
public async void Generate(DateTime BusinessDate)
{
var throttler = new SemaphoreSlim(5);
var allTasks = new List<Task>();
foreach (var id in idsToGenerate)
{
await throttler.WaitAsync();
allTasks.Add(Task.Run(async () =>
{
GenerateReportForIdAsync(id, BusinessDate);
}));
throttler.Release();
}
await Task.WhenAll(allTasks);
}
private Task GenerateReportForIdAsync(Id id, DateTime day)
{
return Task.Run(() => GenerateReportForOid(id, day));
}
private void GenerateReportForId(Id id, DateTime day)
{
LogInformation(); // shortcut
GetDataFromDB(); // shortcut
CreateReportFromRecordFromDB(); // shortcut
UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}
此代码有效,因此已生成报告,但对于最后 1-10 份报告(取决于执行),方法 UpdateInformationInDBThatReportHasBeenGenerated()
似乎尚未 运行。所以看起来应用程序在执行某些记录的 UpdateInformationInDBThatReportHasBeenGenerated()
之前完成。
你知道为什么吗?
更新 这是有效的:
public void Generate(DateTime BusinessDate)
{
var taskAsync = GenerateAsync(BusinessDate);
Task.WaitAll(taskAsync);
}
private async Task GenerateAsync(DateTime BusinessDate)
{
var ab = new ActionBlock<(Id id, DateTime businessDate)>(
GenerateReportForId,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
}
);
foreach (var id in idsToGenerate)
{
ab.Post((id, BusinessDate));
}
ab.Complete();
await ab.Completion;
}
private void GenerateReportForId((Id id, DateTime day) arg)
{
LogInformation(); // shortcut
GetDataFromDB(); // shortcut
CreateReportFromRecordFromDB(); // shortcut
UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}
您可以使用 ActionBlock<TInput> Class from Dataflow (Task Parallel Library) 轻松做到这一点:
public async Task Generate(DateTime BusinessDate)
{
var ab = new ActionBlock<(Id id, DateTime businessDate)>(
GenerateReportForId,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
}
);
foreach (var id in idsToGenerate)
{
ab.Post((id, BusinessDate));
}
ab.Complete();
await ab.Completion;
}
private void GenerateReportForId((Id id, DateTime day) arg)
{
LogInformation(); // shortcut
GetDataFromDB(); // shortcut
CreateReportFromRecordFromDB(); // shortcut
UpdateInformationInDBThatReportHasBeenGenerated(); // shortcut
}