C# Parallel - 将项目添加到正在迭代的集合中,或等效?

C# Parallel - Adding items to the collection being iterated over, or equivalent?

现在,我有一个循环执行以下步骤的 C# 程序:

但是,其中一些任务很长-运行。这会延迟其他待处理任务的处理,因为我们只在程序开始时寻找新任务。

现在,我知道修改正在迭代的集合是不可能的(对吗?),但是 C# Parallel 框架中是否有一些等效的功能可以让我在处理项目的同时向列表添加工作在列表中?

这是您可以尝试的方法示例。我认为您想摆脱 Parallel.ForEaching 并使用异步编程来做一些事情,因为您需要在结果完成时检索结果,而不是在可能包含长 运行 任务和任务的离散块中很快就完成了。

此方法使用简单的顺序循环从异步任务列表中检索结果。在这种情况下,您应该安全地使用简单的非线程安全可变列表,因为列表的所有变更都在同一线程中按顺序发生。

请注意,此方法在循环中使用 Task.WhenAny,这对于大型任务列表来说效率不高,在这种情况下您应该考虑其他方法。 (查看此博客:http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx

此示例基于:https://msdn.microsoft.com/en-GB/library/jj155756.aspx

private async Task<ProcessResult> processTask(ProcessTask task) 
{
    // do something intensive with data
}

private IEnumerable<ProcessTask> GetOutstandingTasks() 
{
    // retreive some tasks from db
}

private void ProcessAllData()
{
    List<Task<ProcessResult>> taskQueue = 
        GetOutstandingTasks()
        .Select(tsk => processTask(tsk))
        .ToList(); // grab initial task queue

    while(taskQueue.Any()) // iterate while tasks need completing
    {
        Task<ProcessResult> firstFinishedTask = await Task.WhenAny(taskQueue); // get first to finish
        taskQueue.Remove(firstFinishedTask); // remove the one that finished
        ProcessResult result = await firstFinishedTask; // get the result
        // do something with task result
        taskQueue.AddRange(GetOutstandingTasks().Select(tsk => processData(tsk))) // add more tasks that need performing
    }
}

一般来说,您是对的,在迭代集合的同时修改集合是不允许的。但是您还可以使用其他方法:

  • 使用ActionBlock<T> from TPL Dataflow。代码可能类似于:

    var actionBlock = new ActionBlock<MyTask>(
        task => DoWorkForTask(task),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    
    while (true)
    {
        var tasks = GrabCurrentListOfTasks();
        foreach (var task in tasks)
        {
            actionBlock.Post(task);
    
            await Task.Delay(someShortDelay);
            // or use Thread.Sleep() if you don't want to use async
        }
    }
    
  • 使用BlockingCollection<T>, which can be modified while consuming items from it, along with GetConsumingParititioner() from ParallelExtensionsExtras使其与Parallel.ForEach()一起工作:

    var collection = new BlockingCollection<MyTask>();
    
    Task.Run(async () =>
    {
        while (true)
        {
            var tasks = GrabCurrentListOfTasks();
            foreach (var task in tasks)
            {
                collection.Add(task);
    
                await Task.Delay(someShortDelay);
            }
        }
    });
    
    Parallel.ForEach(collection.GetConsumingPartitioner(), task => DoWorkForTask(task));