发生超时时,网络绑定 I/O 的并行化速度较慢

Slow parallelizing of network-bound I/O when timeouts occur

我正在并行化一个高度依赖 WinAPI NetAPI32 调用的方法。如果用户输入一个已关闭的主机或数百个列表中的多个主机,调用有时会超时。

int prevThreads, prevPorts;
ThreadPool.GetMinThreads(out prevThreads, out prevPorts);
ThreadPool.SetMinThreads(20, prevPorts);

var parallelScanList = computersToScan.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).WithDegreeOfParallelism(20);

Api.WinApi.AdvApi.LogonAndImpersonate(connection.UserCredential);

foreach (var computer in parallelScanList)
{
        //...
        //this takes a long time to timeout
        status = NetApi.NetUserEnum(computer.DnsHostname, 2,
                (int)NetApi.NetUserEnumFilter.FILTER_NORMAL_ACCOUNT,
                out userbufPtr, (int)LmCons.MAX_PREFERRED_LENGTH, out userEntriesRead, out totalEntries,
                out userResumeHandle);

}

我们在使用 consumer/producer 的 C 客户端中有类似的逻辑。启动 20 个线程并让它们读取一个列表,直到它被耗尽。

function StartProcessingHosts()
{
  for 1 to 20
     StartProcessThread()
}

function ProcessHostsThread()
{
  while(moreHosts)
  {
     //obviously synchronization around here
     var host = popHost();
     DoSomething(host);
  }
}

这非常快,因为所有这些网络调用都在等待,并且可能无法连接到已关闭的主机。

我目前在 C# 中的处理方式似乎是一次处理一个。

更新:

我明白了,问题出在 foreach 循环上。您可能假设通过创建查询 AsParallel 然后在 foreach 中执行它会使其并行。那当然不会发生。此外,使用 PLINQ,您可以实现与 .

中演示的相同的效果

但是,这是另一种并行化代码的方法,我在下面提到它,因为 svick 的回答也受到以下事实的影响,即仅通过设置 MaxDegreeOfParallelism = 20 不能保证 20 次并行执行。它仍然只是并行执行的上限,而不是下限。如果PLINQ执行引擎觉得应该只启动5个并行执行,它就只启动5个,那就是完全合法的执行。

以下代码保证 20 次并行执行:

var concurrentScanList = new ConcurrentQueue<Computer>(computersToScan);
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var taskArray = new Task[20];

//Initializing the tasks
for (var index = 0; index < taskArray.Length; index++)
{
    taskArray[index] = taskFactory.StartNew(() =>
    {
        Computer host;
        while (concurrentScanList.TryDequeue(out host))
        {
            DoSomething(host);
        }
    });
}

//Wait for all tasks to finish - queue will be empty then
Task.WaitAll(baseProcessorTaskArray);

旧答案:

WithDegreeOfParallelism() 是,

the maximum number of concurrently executing tasks that will be used to process the query.

...我的想法是,由于并发执行任务的最小数量不固定,可能是1。

从本质上讲,您的猜测可能是正确的,即此执行不是并行发生的,因此会超时。此外,即使并行发生,并行度等于20,也不能保证总是这样。

我的建议是将 "computers to scan" 放在 BlockingCollection 中,然后生成 20 个任务,每个任务从这个 BlockingCollection 读取一台计算机,然后扫描它。此实现自然是 Producer Consumer,因为这是问题设计的内在质量。

PLINQ,Parallel LINQ 的缩写,您猜对了,并行化 LINQ 查询。例如,如果您写 collection.AsParallel().Where(/* some condition */).Select(/* some projection */).ToList(),那么 Where()Select() 将并行执行。

但你不这样做,你打电话给 AsParallel(),说 "the following LINQ query should execute in parallel"。然后通过调用 WithExecutionMode()WithDegreeOfParallelism() 配置即将到来的查询的并行度。然后你实际上没有任何 LINQ 查询,而是使用 foreach,它将串行迭代集合。

如果你想并行执行一个foreach,你不需要PLINQ,你想要Parallel.ForEach():

Parallel.ForEach(computersToScan, new ParallelOptions { MaxDegreeOfParallelism = 20 },
    computer =>
    {
        //...
    });