如何正确使用 Async、Await 和 ManualResetEvents 来控制无限循环
How to properly use Async, Await and ManualResetEvents to control an infinite while loop
所以我想在这里做的是:
- 如果队列不为空,则使引擎循环并处理对象。
- 如果队列为空,我调用 manualresetevent 使线程休眠。
- 当添加了一个项目并且循环未激活时,我设置了 manualresetevent。
- 为了让它更快,我从列表中选择最多 5 个项目并异步地对它们执行操作并等待它们全部完成。
问题:
- 一旦调用了对 AddToUpdateQueueMethod 的新调用,就会调用两个列表中的清除方法。
- 在我的脑海里,因为我正在等待 Task.WhenAll(任务),所以线程应该在继续之前等待它完成,因此只应在 Task.WhenAll 之后调用列表上的清除(任务)returns.
我在这里遗漏了什么,或者更好的方法是什么。
public async Task ThumbnailUpdaterEngine()
{
int count;
List<Task<bool>> tasks = new List<Task<bool>>();
List<Content> candidateContents = new List<Content>();
while (true)
{
for (int i = 0; i < 5; i++)
{
Content nextContent = GetNextFromInternalQueue();
if (nextContent == null)
break;
else
candidateContents.Add(nextContent);
}
foreach (var candidateContent in candidateContents)
{
foreach (var provider in interactionProviders)
{
if (provider.IsServiceSupported(candidateContent.ServiceType))
{
Task<bool> task = provider.UpdateThumbnail(candidateContent);
tasks.Add(task);
break;
}
}
}
var results = await Task.WhenAll(tasks);
tasks.Clear();
foreach (var candidateContent in candidateContents)
{
if (candidateContent.ThumbnailLink != null && !candidateContent.ThumbnailLink.Equals(candidateContent.FileIconLink, StringComparison.CurrentCultureIgnoreCase))
{
Task<bool> task = DownloadAndUpdateThumbnailCache(candidateContent);
tasks.Add(task);
}
}
await Task.WhenAll(tasks);
//Clean up for next time the loop comes in.
tasks.Clear();
candidateContents.Clear();
lock (syncObject)
{
count = internalQueue.Count;
if (count == 0)
{
isQueueControllerRunning = false;
monitorEvent.Reset();
}
}
await Task.Run(() => monitorEvent.WaitOne());
}
}
private Content GetNextFromInternalQueue()
{
lock (syncObject)
{
Content nextContent = null;
if (internalQueue.Count > 0)
{
nextContent = internalQueue[0];
internalQueue.Remove(nextContent);
}
return nextContent;
}
}
public void AddToUpdateQueue(Content content)
{
lock (syncObject)
{
internalQueue.Add(content);
if (!isQueueControllerRunning)
{
isQueueControllerRunning = true;
monitorEvent.Set();
}
}
}
您应该只使用 TPL 数据流。它是 TPL 之上的 actor 框架,具有 async
支持。使用带有 async
动作的 ActionBlock
和 5:
的 MaxDegreeOfParallelism
var block = new ActionBlock<Content>(
async content =>
{
var tasks = interactionProviders.
Where(provider => provider.IsServiceSupported(content.ServiceType)).
Select(provider => provider.UpdateThumbnail(content));
await Task.WhenAll(tasks);
if (content.ThumbnailLink != null && !content.ThumbnailLink.Equals(
content.FileIconLink,
StringComparison.CurrentCultureIgnoreCase))
{
await DownloadAndUpdateThumbnailCache(content);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5});
foreach (var content in GetContent())
{
block.Post(content);
}
block.Complete();
await block.Completion
所以我想在这里做的是:
- 如果队列不为空,则使引擎循环并处理对象。
- 如果队列为空,我调用 manualresetevent 使线程休眠。
- 当添加了一个项目并且循环未激活时,我设置了 manualresetevent。
- 为了让它更快,我从列表中选择最多 5 个项目并异步地对它们执行操作并等待它们全部完成。
问题:
- 一旦调用了对 AddToUpdateQueueMethod 的新调用,就会调用两个列表中的清除方法。
- 在我的脑海里,因为我正在等待 Task.WhenAll(任务),所以线程应该在继续之前等待它完成,因此只应在 Task.WhenAll 之后调用列表上的清除(任务)returns.
我在这里遗漏了什么,或者更好的方法是什么。
public async Task ThumbnailUpdaterEngine()
{
int count;
List<Task<bool>> tasks = new List<Task<bool>>();
List<Content> candidateContents = new List<Content>();
while (true)
{
for (int i = 0; i < 5; i++)
{
Content nextContent = GetNextFromInternalQueue();
if (nextContent == null)
break;
else
candidateContents.Add(nextContent);
}
foreach (var candidateContent in candidateContents)
{
foreach (var provider in interactionProviders)
{
if (provider.IsServiceSupported(candidateContent.ServiceType))
{
Task<bool> task = provider.UpdateThumbnail(candidateContent);
tasks.Add(task);
break;
}
}
}
var results = await Task.WhenAll(tasks);
tasks.Clear();
foreach (var candidateContent in candidateContents)
{
if (candidateContent.ThumbnailLink != null && !candidateContent.ThumbnailLink.Equals(candidateContent.FileIconLink, StringComparison.CurrentCultureIgnoreCase))
{
Task<bool> task = DownloadAndUpdateThumbnailCache(candidateContent);
tasks.Add(task);
}
}
await Task.WhenAll(tasks);
//Clean up for next time the loop comes in.
tasks.Clear();
candidateContents.Clear();
lock (syncObject)
{
count = internalQueue.Count;
if (count == 0)
{
isQueueControllerRunning = false;
monitorEvent.Reset();
}
}
await Task.Run(() => monitorEvent.WaitOne());
}
}
private Content GetNextFromInternalQueue()
{
lock (syncObject)
{
Content nextContent = null;
if (internalQueue.Count > 0)
{
nextContent = internalQueue[0];
internalQueue.Remove(nextContent);
}
return nextContent;
}
}
public void AddToUpdateQueue(Content content)
{
lock (syncObject)
{
internalQueue.Add(content);
if (!isQueueControllerRunning)
{
isQueueControllerRunning = true;
monitorEvent.Set();
}
}
}
您应该只使用 TPL 数据流。它是 TPL 之上的 actor 框架,具有 async
支持。使用带有 async
动作的 ActionBlock
和 5:
MaxDegreeOfParallelism
var block = new ActionBlock<Content>(
async content =>
{
var tasks = interactionProviders.
Where(provider => provider.IsServiceSupported(content.ServiceType)).
Select(provider => provider.UpdateThumbnail(content));
await Task.WhenAll(tasks);
if (content.ThumbnailLink != null && !content.ThumbnailLink.Equals(
content.FileIconLink,
StringComparison.CurrentCultureIgnoreCase))
{
await DownloadAndUpdateThumbnailCache(content);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5});
foreach (var content in GetContent())
{
block.Post(content);
}
block.Complete();
await block.Completion