为什么 openmp 32 线程比 1 线程慢得多?

Why openmp 32 thread is much slower than 1 thread?

我正在尝试编写一个计算 2 个数组的 l2 范数的应用程序。我必须并行计算。

这是我并行化的代码:

  double time_start_openmp = omp_get_wtime();
  #pragma omp parallel for
  for (i = 0; i < n; i++)
  {
       numberOfThreads = omp_get_num_threads();
       double local_diff = x[i] - xseq[i];
       diff_vector[i] = local_diff;
       l2_norm += (local_diff * local_diff);
  }

   time_end_openmp = omp_get_wtime();

   l2_norm = sqrt(l2_norm);

   openmp_exec_time = time_end_openmp - time_start_openmp;
   printf("OPENMP: %d %ld %f %.12e\n", n, numberOfThreads, openmp_exec_time, l2_norm);

我将代码编译为:

gcc -fopenmp -g -ggdb -Wall -lm -o test test.c 

我运行这段代码有1个线程和32个线程。输出与预期的完全相反。这是一个示例输出:

[hayri@hayri-durmaz MatrixMultipication_MPI]$ export OMP_NUM_THREADS=32
[hayri@hayri-durmaz MatrixMultipication_MPI]$ ./test 10000
OPENMP: 10000 32 0.001084 0.000000000000e+00
[hayri@hayri-durmaz MatrixMultipication_MPI]$ export OMP_NUM_THREADS=1
[hayri@hayri-durmaz MatrixMultipication_MPI]$ ./test 10000
OPENMP: 10000 1 0.000106 0.000000000000e+00

我是不是看错了,还是使用 32 个线程比使用 1 个线程慢 10 倍?那么,我在这里做错了什么?

这是我的完整代码:

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <omp.h>
#include <math.h>

#define MATSIZE 2000

static size_t totalMemUsage = 0;

size_t vectors_dot_prod(double *x, double *y, size_t n)
{
    double res = 0.0;
    size_t i;
    for (i = 0; i < n; i++)
    {
        res += x[i] * y[i];
    }
    return res;
}

size_t vectors_dot_prod2(double *x, double *y, size_t n)
{
    size_t res = 0.0;
    size_t i = 0;
    for (; i <= n - 4; i += 4)
    {
        res += (x[i] * y[i] +
                x[i + 1] * y[i + 1] +
                x[i + 2] * y[i + 2] +
                x[i + 3] * y[i + 3]);
    }
    for (; i < n; i++)
    {
        res += x[i] * y[i];
    }
    return res;
}

void matrix_vector_mult(double **mat, double *vec, double *result, size_t rows, size_t cols)
{ // in matrix form: result = mat * vec;
    size_t i;
    for (i = 0; i < rows; i++)
    {
        result[i] = vectors_dot_prod2(mat[i], vec, cols);
    }
}

double get_random()
{

    double range = 1000;
    double div = RAND_MAX / range;
    double randomNumber = (rand() / div);
    // printf("%d\n", randomNumber);
    return randomNumber;
}

void print_2d_arr(double *arr, size_t row, size_t col)
{
    size_t i, j, index;

    for (i = 0; i < row; i++)
    {
        for (j = 0; j < col; j++)
        {
            index = i * col + j;
            printf("%3f ", arr[index]);
        }
        printf("\n");
    }
}
void print_1d_arr(double *arr, size_t row)
{
    size_t i;
    for (i = 0; i < row; i++)
    {
        printf("%f, ", arr[i]);
    }
    printf("\n");
}

size_t **fullfillArrayWithRandomNumbers(double *arr, size_t n)
{
    /*
    * Fulfilling the array with random numbers 
    * */
    size_t i;
    for (i = 0; i < n; i++)
    {
        arr[i] = get_random();
    }
    return 0;
}

double *allocarray1D(size_t size)
{
    double *array = calloc(size, sizeof(double));
    totalMemUsage = totalMemUsage + size * sizeof(double);
    return array;
}

size_t ParallelRowMatrixVectorMultiply(size_t n, double *a, double *b, double *x, MPI_Comm comm)
{
    size_t i, j;
    size_t nlocal;
    double *fb;
    int npes, myrank;
    MPI_Comm_size(comm, &npes);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    fb = (double *)malloc(n * sizeof(double));
    nlocal = n / npes;
    MPI_Allgather(b, nlocal, MPI_DOUBLE, fb, nlocal, MPI_DOUBLE, comm);
    for (i = 0; i < nlocal; i++)
    {
        x[i] = 0.0;
        for (j = 0; j < n; j++)
        {
            size_t index = i * n + j;
            x[i] += a[index] * fb[j];
        }
    }
    free(fb);
    return 0;
}

size_t ParallelRowMatrixVectorMultiply_WithoutAllgather(size_t n, double *a, double *b, double *x_partial, double *x, MPI_Comm comm)
{

    // Process 0 sends b to everyone
    MPI_Bcast(b, n, MPI_DOUBLE, 0, MPI_COMM_WORLD);

    size_t i, j;
    size_t nlocal;
    // double *fb;
    int npes, myrank;
    MPI_Comm_size(comm, &npes);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
    // fb = (double *)malloc(n * sizeof(double));
    nlocal = n / npes;
    // MPI_Allgather(b, nlocal, MPI_DOUBLE, fb, nlocal, MPI_DOUBLE, comm);
    for (i = 0; i < nlocal; i++)
    {
        x_partial[i] = 0.0;
        for (j = 0; j < n; j++)
        {
            size_t index = i * n + j;
            // printf("%f x %f\n", a[index], b[j]);
            x_partial[i] += a[index] * b[j];
        }
    }
    // free(b);

    // Process 0 gathers x_partials to create x
    MPI_Gather(x_partial, nlocal, MPI_DOUBLE, x, nlocal, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    return 0;
}

size_t SequentialMatrixMultiply(size_t n, double *a, double *b, double *x)
{
    size_t i, j;
    for (i = 0; i < n; i++)
    {
        x[i] = 0.0;
        for (j = 0; j < n; j++)
        {
            size_t index = i * n + j;
            // printf("%f x %f\n", a[index], b[j]);
            x[i] += a[index] * b[j];
        }
    }
    return 0;
}

int main(int argc, char *argv[])
{
    // Global declerations
    size_t i;
    // MPI_Status status;

    // Initialize the MPI environment
    MPI_Init(&argc, &argv);

    // Get the number of processes
    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);

    // Get the rank of the process
    int taskid;
    MPI_Comm_rank(MPI_COMM_WORLD, &taskid);

    // Get the name of the processor
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    int name_len;
    MPI_Get_processor_name(processor_name, &name_len);

    if (argc != 2)
    {
        if (taskid == 0)
            printf("Usage: %s <N>\n", argv[0]);
        MPI_Finalize();
        return 0;
    }
    srand(time(NULL) + taskid);
    size_t n = atoi(argv[1]);
    size_t nOverK = n / world_size;

    double *a = allocarray1D(n * n);
    double *b = allocarray1D(n);
    double *x = allocarray1D(n);
    double *x_partial = allocarray1D(nOverK);
    double *xseq = allocarray1D(n);

    double *a_partial = allocarray1D(n * nOverK);

    if (a == NULL || b == NULL || x == NULL || xseq == NULL || x_partial == NULL)
    {
        if (taskid == 0)
            printf("Allocation failed\n");
        MPI_Finalize();
        return 0;
    }
    // Process 0 creates A matrix.
    if (taskid == 0)
    {
        fullfillArrayWithRandomNumbers(a, n * n);
        // Process 0 produces the b
        fullfillArrayWithRandomNumbers(b, n);
    }

    // Process 0 sends a_partial to everyone
    if (!(world_size == 1 && n == 64000))
    {
        MPI_Scatter(a, n * nOverK, MPI_DOUBLE, a_partial, n * nOverK, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    }

    MPI_Barrier(MPI_COMM_WORLD);
    double time_start = MPI_Wtime();
    ParallelRowMatrixVectorMultiply_WithoutAllgather(n, a_partial, b, x_partial, x, MPI_COMM_WORLD);
    double time_end = MPI_Wtime();
    double parallel_exec_time = time_end - time_start;

    double *exec_times = allocarray1D(world_size);
    // Process 0 gathers x_partials to create x
    MPI_Gather(&parallel_exec_time, 1, MPI_DOUBLE, exec_times, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
    // print_1d_arr(x, n);

    if (taskid == 0)
    {
        SequentialMatrixMultiply(n, a, b, xseq);
        // check difference between x and xseq using OpenMP
        //print_1d_arr(exec_times, world_size);
        // print_1d_arr(xseq, n);
        double max_exec, min_exec, avg_exec;
        min_exec = 1000;
        for (i = 0; i < world_size; i++)
        {
            if (max_exec < exec_times[i])
            {
                max_exec = exec_times[i];
            }
            if (min_exec > exec_times[i])
            {
                min_exec = exec_times[i];
            }
            avg_exec += exec_times[i];
        }
        avg_exec = avg_exec / world_size;

        long double time_start_openmp = omp_get_wtime();
        long double time_end_openmp, openmp_exec_time, min_exec_time, max_exec_time, avg_exec_time;
        max_exec_time = 0;
        max_exec_time = 1000;
        long double l2_norm = 0;
        size_t numberOfThreads = 0;
        size_t r = 0;
        double *diff_vector = allocarray1D(n);
        size_t nrepeat = 10000;

        if (world_size == 1)
        {
            #pragma omp parallel
            {
                numberOfThreads = omp_get_num_threads();
                #pragma omp parallel for private(i)
                for (i = 0; i < n; i++)
                {
                    double local_diff = x[i] - xseq[i];
                    diff_vector[i] = local_diff;
                    l2_norm += (local_diff * local_diff);
                }
            }
        }
        else
        {
            #pragma omp parallel
            {
                numberOfThreads = omp_get_num_threads();
                #pragma omp parallel for private(i)
                for (i = 0; i < n; i++)
                {
                    double local_diff = x[i] - xseq[i];
                    diff_vector[i] = local_diff;
                    l2_norm += (local_diff * local_diff);
                }
            }
        }
        l2_norm = sqrt(l2_norm);
        time_end_openmp = omp_get_wtime();
        openmp_exec_time = time_end_openmp - time_start_openmp;
        // print matrix size, number of processors, number of threads, time, time_openmp, L2 norm of difference of x and xseq (use %.12e while printing norm)
        if (world_size == 1)
        {
            printf("OPENMP: %d %ld %Lf %.12e\n", n, numberOfThreads, openmp_exec_time, openmp_exec_time, l2_norm);
            printf("NEW_OPENMP: %d %ld %f %.12e\n", n, numberOfThreads, openmp_exec_time, l2_norm);
        }
        printf("MIN_AVG_MAX: %d %d %f %f %f\n", n, world_size, min_exec, max_exec, avg_exec);
        printf("MPI: %d %d %f %.12Lf %.12e\n", n, world_size, max_exec, l2_norm, l2_norm);
        totalMemUsage = totalMemUsage / (1024 * 1024 * 1024);
        printf("TOTALMEMUSAGE: %zu\n", totalMemUsage);

        //printf("process: %d %d %d %f %.12e\n", taskid, n, world_size, parallel_exec_time, l2_norm);
        //printf("%d %ld %f %.12e\n", n, numberOfThreads, openmp_exec_time, l2_norm);
    }
    MPI_Finalize();
    return 0;
}

这是输出;


cn009
36
mpicc -fopenmp -g -ggdb  -lm -o rowmv rowmv.c 


OPENMP: 32000 1 0.000299 2.991110086441e-04
MIN_AVG_MAX: 32000 1 3.112523 3.112523 3.112523
MPI: 32000 1 3.112523 0.000000000000 9.532824124368e-130
TOTALMEMUSAGE: 15


OPENMP: 32000 2 0.000535 5.350699648261e-04
MIN_AVG_MAX: 32000 1 3.125519 3.125519 3.125519
MPI: 32000 1 3.125519 0.000000000000 9.532824124368e-130
TOTALMEMUSAGE: 15


OPENMP: 32000 4 0.000434 4.341900348663e-04
MIN_AVG_MAX: 32000 1 3.170650 3.170650 3.170650
MPI: 32000 1 3.170650 0.000000000000 9.532824124368e-130
TOTALMEMUSAGE: 15


OPENMP: 32000 8 0.000454 4.542167298496e-04
MIN_AVG_MAX: 32000 1 3.168685 3.168685 3.168685
MPI: 32000 1 3.168685 0.000000000000 9.532824124368e-130
TOTALMEMUSAGE: 15


OPENMP: 32000 16 0.000507 5.065393634140e-04
MIN_AVG_MAX: 32000 1 3.158761 3.158761 3.158761
MPI: 32000 1 3.158761 0.000000000000 9.532824124368e-130
TOTALMEMUSAGE: 15


OPENMP: 32000 32 0.000875 8.752988651395e-04
MIN_AVG_MAX: 32000 1 3.166051 3.166051 3.166051
MPI: 32000 1 3.166051 0.000000000000 9.532824124368e-130
TOTALMEMUSAGE: 15

Am I seeing wrong or using 32 threads is 10 times slower than 1 thread? So, what am I doing wrong here?

在使用 OpenMP 分析和并行化的代码部分中:

 #pragma omp parallel
 {
    numberOfThreads = omp_get_num_threads();
    #pragma omp parallel for private(i)
    for (i = 0; i < n; i++)
    {
        double local_diff = x[i] - xseq[i];
        diff_vector[i] = local_diff;
        l2_norm += (local_diff * local_diff);
    }
 }

存在竞争条件,即对变量l2_norm的访问。此外,您可以删除 private(i),因为并行循环中的 index 变量( i)将由 OpenMP 隐式设置为 private。可以使用 OpenMP reduction 修复竞争条件。此外,您的循环实际上并未按照您的意愿在线程之间分配迭代。因为您再次向那个 #pragma omp for 添加了 parallel 子句,并且假设您禁用了嵌套并行性,默认情况下它是,在外部 parallel region“顺序”执行该区域内的代码,即:

    #pragma omp parallel for private(i)
    for (i = 0; i < n; i++)
    {
        double local_diff = x[i] - xseq[i];
        diff_vector[i] = local_diff;
        l2_norm += (local_diff * local_diff);
    }

因此,每个线程将执行您打算并行化的循环的所有 N 次迭代。因此,消除并行性并向顺序代码添加额外的开销(例如, 线程创建)。要解决这些问题(竞争条件和“嵌套”并行区域)将此代码更改为:

 #pragma omp parallel
 {
    numberOfThreads = omp_get_num_threads();
    #pragma omp for reduction(+:l2_norm)
    for (i = 0; i < n; i++)
    {
        double local_diff = x[i] - xseq[i];
        diff_vector[i] = local_diff;
        l2_norm += (local_diff * local_diff);
    }
 }

现在,解决了这些问题后,您仍然面临另一个问题(性能方面),即并行循环是在 OpenMP + MPI 和 [=63 的混合并行化的上下文中执行的=]您没有明确 将 OpenMP 线程(在 MPI 进程中)绑定到相应的核心。如果没有这种显式绑定,就无法确定这些线程将在哪个内核中结束。通常情况下,在同一个 逻辑 核心中拥有多个线程 运行 会增加并行化应用程序的整体执行。

如果您的应用程序使用线程,那么您可能希望确保您根本没有绑定(通过指定 --bind-to none),或者使用适当的绑定级别绑定到多个内核或特定数量的处理元素 per application process。您可以通过以下任一方法解决此问题:

  1. 禁用与 MPI 标志的绑定 --bind-to none,以允许将线程分配给不同的内核;
  2. 或相应地执行线程绑定。检查此 如何将线程映射到混合并行化中的核心,例如 MPI + OpenMP

通过相应地显式设置线程数 per process,您可以避免多个线程最终进入同一核心,从而避免同一核心中的线程 相同资源而战

建议:

IMO 你应该首先单独测试 OpenMP 的性能,没有任何 MPI 过程。在此上下文中,通过针对 2 线程测量顺序版本来测试代码的可伸缩性,然后是 48,依此类推,逐渐增加线程数。最终,将有许多线程的代码只是停止缩放。自然地,线程执行的并行工作量必须足够大以克服并行性的开销。因此,您还应该使用越来越大的输入进行测试。

分析、测试改进后的 OpenMP 版本后,您可以使用 MPI.

扩展共享内存与多个进程的并行化

除了@dreamcrash 的回答中指出的更新共享变量的竞争条件之外,您的代码没有正确分配工作。

#pragma omp parallel
{
    numberOfThreads = omp_get_num_threads();
    #pragma omp parallel for private(i)
                ~~~~~~~~
    for (i = 0; i < n; i++)
    {
        double local_diff = x[i] - xseq[i];
        diff_vector[i] = local_diff;
        l2_norm += (local_diff * local_diff);
    }
}

内部循环中的parallel构造使其成为嵌套组合并行for构造。这意味着执行外部并行循环的团队中的每个线程都会产生一个全新的并行区域,并将 i 循环分布在其中的线程上。 在外部并行区域 没有发生分布,你最终会得到 N 个线程,它们都在重复完全相同的工作。默认情况下禁用嵌套并行性,因此嵌套并行区域按顺序运行并且您的代码有效地执行此操作:

#pragma omp parallel
{
    numberOfThreads = omp_get_num_threads();
    for (i = 0; i < n; i++)
    {
        double local_diff = x[i] - xseq[i];
        diff_vector[i] = local_diff;
        l2_norm += (local_diff * local_diff);
    }
}

没有工作分配,所有线程写入 diff_vector[] 数组中的相同位置。

一方面,由于每字节数据的计算量很低,此代码通常是内存限制的代码 - 现代 CPUs 可以在从中获取数据时每个周期执行许多乘法和减法内存和写回结果需要很多周期。由于限制因素是内存带宽,内存限制问题不会随着线程的增加而变得更快。在你的情况下这不是什么大问题,因为 32K 数组条目占用 256 KB 内存并且适合大多数 CPU 缓存,并且 L3 缓存非常快,但仍然比最快的 L1 大单个 CPU 核心的缓存。另一方面,从多个线程写入相同的内存区域会导致 true 和 false 共享,以及相关的线程间缓存失效,这通常会导致并行代码 运行 比顺序版本慢得多。

有一些工具可以帮助您分析代码的性能并发现问题。正如我在评论中所写,英特尔 VTune 就是其中之一,并且作为 oneAPI 工具包的一部分免费提供。 Intel Inspector 是另一个工具(同样是免费的,并且是 oneAPI 工具包的一部分),它可以发现数据竞争等问题。这两个工具可以很好地协同工作,我强烈推荐它们给任何有抱负的并行程序员。

还有一个写入 numberOfThreads 的小竞争条件,但由于写入的所有值都是相同的,所以这不是一个逻辑问题。相关代码的正确版本应该是:

#pragma omp parallel
{
    #pragma omp master
    numberOfThreads = omp_get_num_threads();

    #pragma omp parallel reduction(+:l2_norm)
    for (i = 0; i < n; i++)
    {
        double local_diff = x[i] - xseq[i];
        diff_vector[i] = local_diff;
        l2_norm += (local_diff * local_diff);
    }
}