如何正确使用 Async、Await 和 ManualResetEvents 来控制无限循环

How to properly use Async, Await and ManualResetEvents to control an infinite while loop

所以我想在这里做的是:

  1. 如果队列不为空,则使引擎循环并处理对象。
  2. 如果队列为空,我调用 manualresetevent 使线程休眠。
  3. 当添加了一个项目并且循环未激活时,我设置了 manualresetevent。
  4. 为了让它更快,我从列表中选择最多 5 个项目并异步地对它们执行操作并等待它们全部完成。

问题:

  1. 一旦调用了对 AddToUpdateQueueMethod 的新调用,就会调用两个列表中的清除方法。
  2. 在我的脑海里,因为我正在等待 Task.WhenAll(任务),所以线程应该在继续之前等待它完成,因此只应在 Task.WhenAll 之后调用列表上的清除(任务)returns.

我在这里遗漏了什么,或者更好的方法是什么。

    public async Task ThumbnailUpdaterEngine()
    {
        int count;
        List<Task<bool>> tasks = new List<Task<bool>>();
        List<Content> candidateContents = new List<Content>();
        while (true)
        {

            for (int i = 0; i < 5; i++)
            {
                Content nextContent = GetNextFromInternalQueue();
                if (nextContent == null)
                    break;
                else
                    candidateContents.Add(nextContent);

            }

            foreach (var candidateContent in candidateContents)
            {
                foreach (var provider in interactionProviders)
                {
                    if (provider.IsServiceSupported(candidateContent.ServiceType))
                    {
                        Task<bool> task = provider.UpdateThumbnail(candidateContent);
                        tasks.Add(task);
                        break;
                    }
                }
            }
            var results = await Task.WhenAll(tasks);
            tasks.Clear();
            foreach (var candidateContent in candidateContents)
            {
                if (candidateContent.ThumbnailLink != null && !candidateContent.ThumbnailLink.Equals(candidateContent.FileIconLink, StringComparison.CurrentCultureIgnoreCase))
                {
                    Task<bool> task = DownloadAndUpdateThumbnailCache(candidateContent);
                    tasks.Add(task);
                }
            }
            await Task.WhenAll(tasks);

            //Clean up for next time the loop comes in.
            tasks.Clear();
            candidateContents.Clear();

            lock (syncObject)
            {
                count = internalQueue.Count;
                if (count == 0)
                {
                    isQueueControllerRunning = false;
                    monitorEvent.Reset();
                }
            }
            await Task.Run(() => monitorEvent.WaitOne());


        }
    }

    private Content GetNextFromInternalQueue()
    {
        lock (syncObject)
        {
            Content nextContent = null;
            if (internalQueue.Count > 0)
            {
                nextContent = internalQueue[0];
                internalQueue.Remove(nextContent);
            }
            return nextContent;
        }
    }

    public void AddToUpdateQueue(Content content)
    {
        lock (syncObject)
        {
            internalQueue.Add(content);
            if (!isQueueControllerRunning)
            {
                isQueueControllerRunning = true;
                monitorEvent.Set();
            }
        }
    }

您应该只使用 TPL 数据流。它是 TPL 之上的 actor 框架,具有 async 支持。使用带有 async 动作的 ActionBlock 和 5:

MaxDegreeOfParallelism
var block = new ActionBlock<Content>(
    async content => 
    {
        var tasks = interactionProviders.
            Where(provider => provider.IsServiceSupported(content.ServiceType)).
            Select(provider => provider.UpdateThumbnail(content));
        await Task.WhenAll(tasks);

        if (content.ThumbnailLink != null && !content.ThumbnailLink.Equals(
            content.FileIconLink, 
            StringComparison.CurrentCultureIgnoreCase))
        {
            await DownloadAndUpdateThumbnailCache(content);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5});

foreach (var content in GetContent())
{
    block.Post(content);
}

block.Complete();
await block.Completion