如何动态限制 Parallel.Foreach 创建的任务的并发执行

How to dynamically limit concurrent execution of tasks created by Parallel.Foreach

我正在创建一个针对一组资源执行命令的应用程序。例如,资源可以是服务器。例如,任务可以是 "ping" 或 "defragment database index"(这些只是示例,因为我无法揭示应用程序的真实性质)。 应用程序使用 TPL (Parallel.ForEach).

同时执行此命令

现在我已经到了必须限制命令对存储在公共资源上的资源的并发执行的地步。例如,这里有一个必须进行碎片整理的数据库索引列表。

Task# | Server | DB  | Index Name
---------------------------------
T1    | 1      | DB1 | A
T2    | 1      | DB1 | B
T3    | 1      | DB1 | C
T4    | 1      | DB2 | D
T5    | 2      | DB3 | E
T6    | 2      | DB3 | F
T7    | 3      | DB4 | G
T8    | 4      | DB5 | H
T9    | 6      | DB6 | I

为了防止服务器上出现大量并行磁盘I/O,我必须将索引碎片整理任务的执行限制为每台服务器一次。 这意味着任务 T7、T8 和 T9 可以同时 运行。只有任务 T1-T4 中的一个必须同时 运行。任务 T5 和 T6 相同。 例如,还应该可以将并发执行限制为每个资源 2 个任务。我怎样才能做到这一点?

我看过 TaskScheduler class,但 TaskScheduler 似乎无法拒绝任务(QueueTask 方法)。此外,自定义分区程序不会让我到达那里。

我能想到的唯一解决方案是创建我自己的 Parallel.ForEach 实现,它可以完全解决这个限制。

有没有更好的主意?

如上所述,这是一个示例用例。因此,请不要 post 回答如下:为此创建一个 SQL 代理作业。

这个例子不是最合适的。尝试同时对多个索引进行碎片整理将导致 更差 性能,因为碎片整理操作将竞争相同的 CPU 和磁盘资源。并发碎片整理操作 运行 更快的唯一方法是将每个索引存储在不同的磁盘(数组)中。

您不妨将所有碎片整理语句写在一个脚本中,然后 运行 它。

在一般情况下,您希望将一些 jobs/operations 排队以在有限的并行度下并发执行,合适的库是 TPL Dataflow and the ActionBlock class。 ActionBlock 允许您将 post 消息发送到块以供并发执行。默认情况下,一次只处理一条消息。您可以在创建块时简单地传递不同的 DOP 参数以处理更多消息。

var dopOptions=new ExecutionDataflowBlockOptions
               {
                   MaxDegreeOfParallelism = maxDop
               }
var defragBlock=new ActionBlock<string)(indexName=>MyDefragMethod(indexName), dopOptions);

//Post all the indexes. Only maxDop will be processed at a time
foreach(var indexName in indexesList)
{
    defragBlock.Post(indexName);
}

//Notify the block that we are done
defragBlock.Complete();

//and wait for all remaining indexes to finish
await defragBlock.Completion;

Parallel 方法不适合作业处理场景。它们用于 data 并行场景。它们旨在通过对数据进行分区并在每个分区处理一个任务来处理大量数据。分区数与核心数大致相同。

Dataflow 库允许您使用更高级的场景。例如,如果您定位 多个 服务器怎么办?在这种情况下,您可以 运行 同时在每台服务器上进行碎片整理操作。您可以通过为每个服务器创建不同的块,传递不同的连接字符串来做到这一点,例如

var block1=new ActionBlock<string)(indexName=>MyDefragMethod(indexName,connStr1), dopOptions);
var block2 =new ActionBlock<string)(indexName=>MyDefragMethod(indexName,connStr2), dopOptions);

foreach(var indexName in indexesList1)
{
    block1.Post(indexName);
}
foreach(var indexName in indexesList2)
{
    block2.Post(indexName);
}

//Notify the block that we are done
block1.Complete();
block2.Complete();

//and wait for all remaining indexes to finish
await Task.WhenAll(block1.Completion,block2.Completion);

您还可以将块存储在以服务器名称为关键字的字典中,这样您就可以 select 在循环中找到正确的块。