为什么 Parallel.For 堆密集型操作不快?

Why isn't Parallel.For fast with heap-intensive operations?

对于某些操作,Parallel 可以很好地适应 CPU 的数量,但对于其他操作,则不能。

考虑下面的代码,function1 获得了 10 倍的改进,而 function2 获得了 3 倍的改进。这是由于内存分配,还是 GC?

void function1(int v) {
    for (int i = 0; i < 100000000; i++) {
        var q = Math.Sqrt(v);
    }
}
void function2(int v) {
    Dictionary<int, int> dict = new Dictionary<int, int>();
    for (int i = 0; i < 10000000; i++) {
        dict.Add(i, v);
    }
}
var sw = new System.Diagnostics.Stopwatch();

var iterations = 100;

sw.Restart();
for (int v = 0; v < iterations; v++) function1(v);
sw.Stop();
Console.WriteLine("function1 no parallel: " + sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms"));

sw.Restart();
Parallel.For(0, iterations, function1);
sw.Stop();
Console.WriteLine("function1 with parallel: " + sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms"));

sw.Restart();
for (int v = 0; v < iterations; v++) function2(v);
sw.Stop();
Console.WriteLine("function2 no parallel: " + sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms"));

sw.Restart();
Parallel.For(0, iterations, function2);
sw.Stop();
Console.WriteLine("function2 parallel: " + sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms"));


我机器上的输出:

function1   no parallel:  2 059,4 ms
function1 with parallel:    213,7 ms
function2   no parallel: 14 192,8 ms
function2      parallel:  4 491,1 ms

环境:
Win 11,.Net 6.0,发布版本
i9 第 12 代,16 核,24 处理器,32 GB DDR5


经过更多测试后,内存分配似乎不能很好地适应多线程。例如,如果我将函数 2 更改为:

void function2(int v) {
    Dictionary<int, int> dict = new Dictionary<int, int>(10000000);
}

结果是:

function2   no parallell:   124,0 ms
function2      parallell:   402,4 ms

结论是内存分配不能很好地扩展多线程吗?...

第一个函数在寄存器中工作。更多内核 = 更多寄存器。

第二个函数作用于内存。更多核心 = 只有更多 L1 缓存但共享 RAM。 1000 万个元素的数据集肯定只来自 RAM,因为即使是 L3 也不够大。这假设语言的 jit 将分配优化为重用缓冲区。如果不是,那么也会有分配开销。因此,您应该在每次新迭代时 re-use 字典,而不是重新创建。

您还使用增量整数索引保存数据。简单的数组可以在这里工作,当然在迭代之间使用 re-use 。它的内存占用应该比字典少。

tl;dr:堆分配争用。

您的第一个函数是 embarrassingly parallel。每个线程都可以完成自己的计算,而与其他线程的交互少得令人尴尬。所以它可以很好地扩展到多个线程。 huseyin tugrul buyukisik 正确地指出,您的第一个计算使用 non-shared,每个线程,处理器寄存器。

你的第二个函数,当它预分配字典时,在某种程度上不那么令人尴尬地并行。除了每个线程都使用您机器的 RAM 子系统之外,每个线程的计算都独立于其他线程。因此,您会在硬件级别看到一些 thread-to-thread 争用,因为 thread-level 缓存数据写入 machine-level RAM 并从中读取。

你的第二个不预分配内存的函数不是并行的。为什么不?每个 .Add() 操作必须在共享堆中分配一些数据。这不能并行完成,因为所有线程共享同一个堆。相反,它们必须同步。 dotnet 库在尽可能并行化堆操作方面做得很好,但是当线程 A 分配堆数据时,它们至少没有避免线程 B 的一些阻塞。所以线程会互相减慢速度。

单独的进程而不是单独的线程是扩展工作负载的好方法,例如 non-preallocating 第二个函数。每个进程都有自己的堆。

并行编程没那么简单。使用 Parallel.For() 或 Parallel.ForEach() 不会自动使您的程序并行。 并行编程不是调用任何更高级别的函数(在任何编程语言中)来使您的代码并行。是关于准备你的代码是并行的。

实际上,您根本没有并联任何东西,无论是 func1 还是 func2。 回到基础,并行的两种基本类型是:

按任务,您将一个复杂的任务拆分成更小的子任务,每个子任务同时处理不同的核心、CPUs 或节点(在计算机集群中)

通过数据,将一个大数据集分成几个较小的切片,每个切片同时处理不同的核心、CPUs 或节点

数据并行更难实现,并不总能带来真正的性能提升。

Func1 并不是真正的并行,它只是一个繁重的并发计算 运行ning。 (你的CPU只是在争论谁先完成100M的for循环) 使用 Parallel.For() 你只是在你的线程中产生了这个繁重的函数 100 次。 内部带有 Task.Run() 的单个 for 循环将具有几乎相同的结果

如果你的运行这个只有一个thread/core显然需要一些时间。如果你 运行 在你所有的核心中会更快。这里没什么大不了的,虽然是并发代码,但实际上并不是并行的。此外,调用这些任务 100 次,如果您没有这些数量的 CPU 核心(或集群中的节点),则没有太大区别,parallel/concurrent 代码将受到实际限制 CPU 机器中的内核(将在以后的示例中看到)

现在介绍 Func2 以及与内存堆的交互。是的,每一种带有 built-in GC 的现代语言都是 CPU 昂贵的。垃圾收集是复杂算法中最昂贵的操作之一,有时在 non-optimized 代码中它可以代表超过 90% 的 CPU 时间。

让我们分析一下你的功能2

  1. 在函数作用域中声明一个新的字典
  2. 用 1 亿个项目填充此词典
  3. 在范围之外,您在 Parallel.For 中调用了 function2 并进行了 100 次交互
  4. 100 个不同的范围用 100M 数据填充 100 个不同的词典
  5. 这些作用域之间没有相互作用

如前所述,这不是并行编程,这是并发编程。您在每个范围内都有 100M 条目的 100 个数据块,它们彼此不交互

但还有第二个因素。你的 function2 操作是一个写操作(这意味着你的 adding-updading-deleting 东西到一个集合)。好吧,如果它只是一堆随机数据,你可以承认一些损失和不一致。但是,如果您正在处理真实数据并且不允许任何类型的丢失或不一致,那就是坏消息。写入相同的内存地址(对象引用)没有真正的并行。您将需要一个同步上下文,这会使事情变得更慢,并且这些同步操作将始终是并发的,因为如果一个线程正在内存引用上写入,另一个线程必须等到另一个线程离开。实际上,使用多个线程写入数据可能会使您的代码变慢而不是变快,特别是如果并行操作不是 CPU-bound.

为了获得数据并行性的真正收益,您一定一直在对这些分区数据进行大量计算。

让我们根据您的方法检查下面的代码,但有一些更改:

var rand = new Random();
var operationSamples = 256;
var datasetSize = 100_000_000;
var computationDelay = 50;
var cpuCores = Environment.ProcessorCount;
Dictionary<int, int> datasetWithLoss = new(datasetSize);
Dictionary<int, int> dataset = new(datasetSize);
double result = 0;
Stopwatch sw = new();

ThreadPool.SetMinThreads(1, 1);

int HeavyComputation(int delay)
{
    int iterations = 0;
    var end = DateTime.Now + TimeSpan.FromMilliseconds(delay);
    while (DateTime.Now < end)
        iterations++;

    return iterations;
}

double SequentialMeanHeavyComputation(int maxMilliseconds, int samples = 64)
{
    double sum = 0;
    for (int i = 0; i < samples; i++)
        sum += HeavyComputation(maxMilliseconds);
    return sum / samples;
}

double ParallelMeanHeavyComputation(int maxSecondsCount, int samples = 64, int threads = 4)
{
    ThreadPool.SetMaxThreads(threads, threads);
    ThreadPool.GetAvailableThreads(out int workerThreads, out _);
    Console.WriteLine($"Available Threads: {workerThreads}");

    var _lockKey = new object();
    double sum = 0;
    int offset = samples / threads;
    List<Action> tasks = new();

    for (int i = 0; i < samples; i++)
        tasks.Add(new Action(() =>
        {
            var result = HeavyComputation(maxSecondsCount);
            lock (_lockKey)
                sum += result;
        }));

    Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = threads }, tasks.ToArray());

    return sum / samples;
}

void SequentialDatasetPopulation(int size)
{
    for (int i = 0; i < datasetSize; i++)
        dataset.TryAdd(i, Guid.NewGuid().GetHashCode());
}

void ParalellDatasetPopulation(int size, int threads)
{
    var _lock = new object();
    ThreadPool.SetMaxThreads(threads, threads);
    ThreadPool.GetAvailableThreads(out int workerThreads, out _);
    Console.WriteLine($"Available Threads: {workerThreads}");

    Parallel.For(0, datasetSize, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) =>
    {
        var value = Guid.NewGuid().GetHashCode();

        lock (_lock)
            dataset.Add(i, value);
    });
}

double SequentialReadOnlyDataset()
{
    foreach (var x in dataset)
    {
        HeavyComputation((int)Math.Tan(Math.Cbrt(Math.Log(Math.Log(x.Value)))) / 10);
    }

    return 0;
}

double ParallelReadOnlyDataset()
{
    Parallel.ForEach(dataset, x =>
    {
        HeavyComputation((int)Math.Tan(Math.Cbrt(Math.Log(Math.Log(x.Value)))) / 10);
    });
    return 0;
}

void ParalellDatasetWithLoss(int size, int threads)
{
    ThreadPool.SetMaxThreads(threads, threads);
    ThreadPool.GetAvailableThreads(out int workerThreads, out _);
    Console.WriteLine($"Available Threads: {workerThreads}");

    Parallel.For(0, datasetSize, new ParallelOptions { MaxDegreeOfParallelism = threads }, (i) =>
    {
        int value = Guid.NewGuid().GetHashCode();
        datasetWithLoss.Add(i, value);
    });
}

sw.Restart();
result = SequentialMeanHeavyComputation(computationDelay, operationSamples);
sw.Stop();
Console.WriteLine($"{nameof(SequentialMeanHeavyComputation)} sequential tasks: {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

sw.Restart();
result = ParallelMeanHeavyComputation(computationDelay, operationSamples, threads: cpuCores);
sw.Stop();

Console.WriteLine($"{nameof(ParallelMeanHeavyComputation)} parallel tasks (CPU threads match count): {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

sw.Restart();
result = ParallelMeanHeavyComputation(computationDelay, operationSamples, threads: 100);
sw.Stop();
Console.WriteLine($"{nameof(ParallelMeanHeavyComputation)} parallel tasks (Higher thread count): {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

sw.Restart();
result = ParallelMeanHeavyComputation(computationDelay, operationSamples, threads: 4);
sw.Stop();
Console.WriteLine($"{nameof(ParallelMeanHeavyComputation)} parallel tasks (Lower thread count): {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

sw.Restart();
SequentialDatasetPopulation(datasetSize);
sw.Stop();
Console.WriteLine($"{nameof(SequentialDatasetPopulation)} sequential data population: {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

dataset.Clear();

sw.Restart();
ParalellDatasetPopulation(datasetSize, cpuCores);
sw.Stop();
Console.WriteLine($"{nameof(ParalellDatasetPopulation)} parallel data population: {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

sw.Restart();
ParalellDatasetWithLoss(datasetSize, cpuCores);
sw.Stop();
Console.WriteLine($"{nameof(ParalellDatasetWithLoss)} parallel data with loss: {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");
Console.WriteLine($"Lossless dataset count: {dataset.Count}");
Console.WriteLine($"Dataset with loss: {datasetWithLoss.Count}\n");

datasetWithLoss.Clear();

sw.Restart();
SequentialReadOnlyDataset();
sw.Stop();
Console.WriteLine($"{nameof(SequentialReadOnlyDataset)} sequential reading operations: {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

sw.Restart();
ParallelReadOnlyDataset();
sw.Stop();
Console.WriteLine($"{nameof(ParallelReadOnlyDataset)} parallel reading operations: {sw.Elapsed.TotalMilliseconds.ToString("### ##0.0ms\n")}");

Console.Read();

输出:

SequentialMeanHeavyComputation sequential tasks: 12 800,7ms

Available Threads: 15
ParallelMeanHeavyComputation parallel tasks (CPU threads match count):  860,3ms

Available Threads: 99
ParallelMeanHeavyComputation parallel tasks (Higher thread count):  805,0ms

Available Threads: 3
ParallelMeanHeavyComputation parallel tasks (Lower thread count): 3 200,4ms

SequentialDatasetPopulation sequential data population: 9 072,4ms

Available Threads: 15
ParalellDatasetPopulation parallel data population: 23 420,0ms

Available Threads: 15
ParalellDatasetWithLoss parallel data with loss: 6 788,3ms

Lossless dataset count: 100000000
Dataset with loss: 77057456

SequentialReadOnlyDataset sequential reading operations: 20 371,0ms

ParallelReadOnlyDataset parallel reading operations: 3 020,6ms

(红色:25%,橙色:56%,绿色:75%,蓝色:100%)

通过任务并行性,我们使用 100% 的 CPU 线程实现了超过 20 倍的性能。 (在这个例子中,并不总是这样)

在 read-only 数据并行与一些计算中,我们实现了近 6.5 倍的 CPU 使用率 56%(计算越少,差异会越小)

但是尝试实现数据的“真正并行”来写入我们的性能会慢两倍以上,而且 CPU 由于同步上下文

仅使用 25% 的使用率无法充分发挥潜力

结论: 使用 Parallel.For 并不能保证您的代码会 运行 真正地并行,也不会更快。它需要先前的 code/data 准备和深入分析、基准测试和调整

另请查看有关并行代码中反派的 Microsoft 文档

https://docs.microsoft.com/pt-br/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism