使用 ParallelForEachAsync 并行处理不 运行

Process not running in parallel using ParallelForEachAsync

我正在通过 Process.Start 并行测试 运行ning python

我的机器有 2.8GHz CPU 4 核和 8 个逻辑处理器

我的主控制台应用程序如下

    static void Main(string[] args) => MainAsync(args).GetAwaiter().GetResult();

    static async Task MainAsync(string[] args)
    {            
        var startTime = DateTime.UtcNow;
        Console.WriteLine($"Execution started at {DateTime.UtcNow:T}");

        await ExecuteInParallelAsync(args).ConfigureAwait(false);
        Console.WriteLine($"Executions completed at {DateTime.UtcNow:T}");
        var endTime = DateTime.UtcNow;

        var duration = (endTime - startTime);
        Console.WriteLine($"Execution took {duration.TotalMilliseconds} milliseconds {duration.TotalSeconds} seconds");

        Console.WriteLine("Press Any Key to close");
        Console.ReadKey();            
    }

其中 ExecuteInParallelAsync 是完成工作的方法...

    private static async Task ExecuteInParallelAsync(string[] args)
    {
        var executionNumbers = new List<int>();
        var executions = 5;

        for (var executionNumber = 1; executionNumber <= executions; executionNumber++)
        {
            executionNumbers.Add(executionNumber);
        }

        await executionNumbers.ParallelForEachAsync(async executionNumber =>
        {
             Console.WriteLine($"Execution {executionNumber} of {executions} {DateTime.UtcNow:T}");
            ExecuteSampleModel();
            Console.WriteLine($"Execution {executionNumber} complete {DateTime.UtcNow:T}");
        }).ConfigureAwait(false);
    }

ExecuteSampleModel 运行是 Python 模型...

    IModelResponse GetResponse()
    { 
        _actualResponse = new ModelResponse();

        var fileName = $@"main.py";

        var p = new Process();
        p.StartInfo = new ProcessStartInfo(@"C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\python.exe", fileName)
        {
            WorkingDirectory = RootFolder,
            RedirectStandardOutput = true,
            UseShellExecute = false,
            CreateNoWindow = true
        };
        p.Start();

        _actualResponse.RawResponseFromModel = p.StandardOutput.ReadToEnd();
        p.WaitForExit();

        return _actualResponse;
    }

如你所见,我要求这个模型执行5次

当我使用调试器时,即使我使用的是 ParalellForEach(由 AsyncEnumerator 包引入),它似乎也不是 运行 并行

我以为每次迭代都是 运行 在它自己的线程上?

每个 Python 模型执行需要 5 秒。

运行 并行我希望整个过程在 15 秒左右完成,但实际上需要 34 秒

调用GetResponse前后添加的Console.WriteLines表示第一个调用开始,完整执行,然后第二个开始,等等

这和我打电话给Process.Start有关吗?

有人能看出这有什么问题吗?

保罗

为了使答案在这里有用,请解释异步代码发生了什么。省略许多从解释的角度来看并不那么重要的细节 ParallelForEachAsync 循环中的代码如下所示:

// some preparations
...
var itemIndex = 0L;
while (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false))
{
    ...
    Task itemActionTask = null;
    try
    {
        itemActionTask = asyncItemAction(enumerator.Current, itemIndex);
    }
    catch (Exception ex)
    {
       // some exception handling
    }
    ...
    itemIndex++;
}

其中 asyncItemAction 的类型为 Func<T, long, Task> 并且它是类型为 Func<T, Task> 的自定义异步操作的包装器,它作为参数传递给 ParallelForEachAsync 调用(包装器添加索引功能)。循环代码只是调用此操作以获得代表异步操作承诺等待其完成的任务。在给定代码示例的情况下,自定义操作

async executionNumber =>
{
     Console.WriteLine($"Execution {executionNumber} of {executions}{DateTime.UtcNow:T}");
     ExecuteSampleModel();
     Console.WriteLine($"Execution {executionNumber} complete {DateTime.UtcNow:T}");
}

不包含异步代码,但前缀 async 允许编译器使用 returns 一些 Task 方法生成状态机,这使得此代码(从语法的角度来看)与自定义操作兼容在循环内调用。 重要的是循环内的代码期望此操作是异步的,这意味着该操作隐式分为同步部分,将与 asyncItemAction(enumerator.Current, itemIndex) 调用和至少一个(一个或多个取决于 awaits inside) 可以在迭代其他循环项期间执行的异步部分。下面的伪代码给出了一个想法:

 {
     ... synchronous part
     await SomeAsyncOperation();
     ... asynchronous part
 } 

在这种特殊情况下,自定义操作中根本没有异步部分,因此意味着调用

 itemActionTask = asyncItemAction(enumerator.Current, itemIndex);

将同步执行,直到 asyncItemAction 完成整个自定义操作执行后才会开始循环内的下一次迭代。

这就是为什么关闭代码​​中的异步并使用简单的并行机制会有所帮助。