运行 并行异步任务和 return 导致 .NET Core Web API

Running parallel async tasks and return result in .NET Core Web API

您好,我最近在 .net core web api 项目中工作,该项目正在从外部 api 下载文件。 在此 .net 核心中 api 最近发现了一些问题,而文件数量超过 100 个。API 正在下载最多 50 个文件并跳过其他文件。 WebAPI 部署在 AWS Lambda 上,超时为 15mnts。

实际上由于下载过程较长,操作超时

public async Task<bool> DownloadAttachmentsAsync(List<DownloadAttachment> downloadAttachment)
        {
            try
            {
                bool DownloadFlag = false;

                foreach (DownloadAttachment downloadAttachment in downloadAttachments)
                {
                    DownloadFlag = await DownloadAttachment(downloadAttachment.id);

                    //update the download status in database
                    if(DownloadFlag)
                    {
                      bool UpdateFlag = await _DocumentService.UpdateDownloadStatus(downloadAttachment.id);

                      if (UpdateFlag)
                      {
                        await DeleteAttachment(downloadAttachment.id);
                      }
                   }
                }
                return true;
            }
            catch (Exception ext)
            {
                log.Error(ext, "Error in Saving attachment {attachemntId}",downloadAttachment.id);
                return false;
            }
        }

文档服务代码

public async Task<bool> UpdateAttachmentDownloadStatus(string AttachmentID)
        {
            return await _documentRepository.UpdateAttachmentDownloadStatus(AttachmentID);
        }

和数据库更新代码

public async Task<bool> UpdateAttachmentDownloadStatus(string AttachmentID)
        {
            using (var db = new SqlConnection(_connectionString.Value))
            {
                var Result = 0; bool SuccessFlag = false;
                var parameters = new DynamicParameters();
                parameters.Add("@pm_AttachmentID", AttachmentID);               
                parameters.Add("@pm_Result", Result, System.Data.DbType.Int32, System.Data.ParameterDirection.Output);
                var result = await db.ExecuteAsync("[Loan].[UpdateDownloadStatus]", parameters, commandType: CommandType.StoredProcedure);
                Result = parameters.Get<int>("@pm_Result");
                if (Result > 0) { SuccessFlag = true; }
                return SuccessFlag;
            }
        }

如何将此异步任务移动到 运行 并行?并得到结果?我尝试了以下代码

var task = Task.Run(() => DownloadAttachment( downloadAttachment.id));
bool result = task.Result; 

这个方法好吗?如何提高性能?如何从每个并行任务中获取结果并更新到数据库并根据成功标志删除?或者这个错误是由于 AWS 超时?

请帮忙

如果您将处理单个文件的代码提取到单独的方法中:

private async Task DownloadSingleAttachment(DownloadAttachment attachment)
{
    try
    {
        var download = await DownloadAttachment(downloadAttachment.id);
        if(download)
        {
            var update = await _DocumentService.UpdateDownloadStatus(downloadAttachment.id);
            if (update)
            {
                await DeleteAttachment(downloadAttachment.id);
            }
        }
    }
    catch(....)
    {
    ....
    }
}

public async Task<bool> DownloadAttachmentsAsync(List<DownloadAttachment> downloadAttachment)
{
    try
    {
      foreach (var attachment in downloadAttachments)
      {
          await DownloadSingleAttachment(attachment);
      }
    }
    ....
}

一次开始所有下载会很容易,虽然效率不高:

public async Task<bool> DownloadAttachmentsAsync(List<DownloadAttachment> downloadAttachment)
{

    try
    {
        //Start all of them
        var tasks=downloadAttachments.Select(att=>DownloadSingleAttachment(att));
        await Task.WhenAll(tasks);
    }
    ....
}

这不是很有效,因为外部服务像您一样讨厌来自单一来源的大量并发调用,并且几乎肯定会施加限制。数据库也不喜欢大量并发调用,因为 在所有数据库产品中 并发调用会以某种方式导致阻塞。即使在使用多版本的数据库中,这也会带来开销。

使用数据流类 - 单块

解决此问题的一种简单方法是使用 .NET 的 Dataflow 类 将操作分解为一系列步骤,并使用不同数量的并发任务执行每个步骤。

我们可以将整个操作放在一个块中,但如果更新和删除操作不是线程安全的,那可能会导致问题:

var dlOptions= new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10,
};

var downloader=new ActionBlock<DownloadAttachment>(async att=>{
    await DownloadSingleAttachment(att);
},dlOptions);

foreach (var attachment in downloadAttachments)
{
    await downloader.SendAsync(attachement.id);
}

downloader.Complete();
await downloader.Completion;

数据流 - 多个步骤

为避免可能的线程问题,其余方法可以转到它们自己的块中。它们可以同时进入一个 ActionBlock,同时调用 UpdateDelete,或者如果这些方法与具有不同并发要求的不同服务对话,它们可以进入单独的块。

downloader 块将最多执行 10 个并发下载。默认情况下,每个块一次只使用一个任务。

updaterdeleter 块的默认 DOP=1,这意味着只要它们不尝试同时使用相同的连接,就没有竞争条件的风险时间。

var downloader=new TransformBlock<string,(string id,bool download)>(
    async id=> {
        var download=await DownloadAttachment(id);
        return (id,download);
},dlOptions);

var updater=new TransformBlock<(string id,bool download),(string id,bool update)>(
    async (id,download)=> {
        if(download)
        {
            var update = await _DocumentService.UpdateDownloadStatus(id);
            return (id,update);
        }
        return (id,false);
});

var deleter=new ActionBlock<(string id,bool update)>(
    async (id,update)=> {
        if(update)
        {
            await DeleteAttachment(id);
        }
});

这些块现在可以链接到管道中并使用。设置 PropagateCompletion = true 意味着一旦一个块完成处理,它就会告诉所有连接的块也完成:

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
downloader.LinkTo(updater, linkOptions);
updater.LinkTo(deleter,linkOptions);

我们可以根据需要将数据泵入头部块。完成后,我们调用 head 块的 Complete() 方法。当每个块完成处理其数据时,它将其完成传播到管道中的下一个块。我们需要等待最后一个(尾部)块完成以确保所有附件都已处理:

foreach (var attachment in downloadAttachments)
{
    await downloader.SendAsync(attachement.id);
}

downloader.Complete();
await deleter.Completion;

每个块都有一个输入和(必要时)一个输出缓冲区,这意味着消息的“生产者”和“消费者”不必同步,甚至不必相互了解。所有“生产者”需要知道的是在管道中的何处找到头块。

节流和背压

一种限制方法是通过 MaxDegreeOfParallelism.

使用固定数量的任务

也可以限制输入缓冲区,从而在块不能足够快地处理消息时阻止前面的步骤或生产者。这可以简单地通过为块设置 BoundedCapacity option 来完成:

var dlOptions= new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10,
    BoundedCapacity=20,
};

var updaterOptions= new ExecutionDataflowBlockOptions
{
    BoundedCapacity=20,
};

...

var downloader=new TransformBlock<...>(...,dlOptions);

var updater=new TransformBlock<...>(...,updaterOptions);

无需进行其他更改

要运行多个异步操作,你可以这样做:

    public async Task RunMultipleAsync<T>(IEnumerable<T> myList)
    {
        const int myNumberOfConcurrentOperations = 10;
        var mySemaphore = new SemaphoreSlim(myNumberOfConcurrentOperations);
        var tasks = new List<Task>();
        foreach(var myItem in myList)
        {
            await mySemaphore.WaitAsync();
            var task = RunOperation(myItem);
            tasks.Add(task);
            task.ContinueWith(t => mySemaphore.Release());           
        }

        await Task.WhenAll(tasks);
    }

    private async Task RunOperation<T>(T myItem)
    {
        // Do stuff
    }

将来自 DownloadAttachmentsAsync 的代码放在 'Do stuff' 评论

这将使用信号量来限制并发操作的数量,因为 运行 由于竞争,许多并发操作通常不是一个好主意。您需要通过试验来找到适合您的用例的最佳并发操作数。另请注意,为了使示例简短,省略了错误处理。