向量乘法中的 SIMD 与 OMP

SIMD vs OMP in vector multiplication

在我的项目中,我必须对 double *a-向量或 float *a-向量进行多次向量乘法运算。为了加快速度,我想使用 SIMD 操作或 omp。为了获得最快的结果,我写了一个基准程序:

#include <iostream>
#include <memory>
#include <vector>
#include <omp.h>
#include <immintrin.h>
#include <stdlib.h>
#include <chrono>


#define SIZE 32768
#define ROUNDS 1e5

void multiply_singular(float *a, float *b, float *d)
{
    for(int i = 0; i < SIZE; i++)
        d[i] = a[i]*b[i];
}

void multiply_omp(float *a, float *b, float *d)
{
#pragma omp parallel for
    for(int i = 0; i < SIZE; i++)
        d[i] = a[i]*b[i];
}

void multiply_avx(float *a, float *b, float *d)
{
    __m256 a_a, b_a, c_a;
    for(int i = 0; i < SIZE/8; i++)
    {
        a_a = _mm256_loadu_ps(a+8*i);
        b_a = _mm256_loadu_ps(b+8*i);
        c_a = _mm256_mul_ps(a_a, b_a);
        _mm256_storeu_ps (d+i*8, c_a);
    }
}

void multiply_avx_omp(float *a, float *b, float *d)
{
    __m256 a_a, b_a, c_a;
#pragma omp for
    for(int i = 0; i < SIZE/8; i++)
    {
        a_a = _mm256_loadu_ps(a+8*i);
        b_a = _mm256_loadu_ps(b+8*i);
        c_a = _mm256_mul_ps(a_a, b_a);
        _mm256_storeu_ps (d+i*8, c_a);
    }
}

void multiply_singular_double(double *a, double *b, double *d)
{
    for(int i = 0; i < SIZE; i++)
        d[i] = a[i]*b[i];
}

void multiply_omp_double(double *a, double *b, double *d)
{
#pragma omp parallel for
    for(int i = 0; i < SIZE; i++)
        d[i] = a[i]*b[i];
}

void multiply_avx_double(double *a, double *b, double *d)
{
    __m256d a_a, b_a, c_a;
    for(int i = 0; i < SIZE/4; i++)
    {
        a_a = _mm256_loadu_pd(a+4*i);
        b_a = _mm256_loadu_pd(b+4*i);
        c_a = _mm256_mul_pd(a_a, b_a);
        _mm256_storeu_pd (d+i*4, c_a);
    }
}

void multiply_avx_double_omp(double *a, double *b, double *d)
{
    __m256d a_a, b_a, c_a;
#pragma omp parallel for
    for(int i = 0; i < SIZE/4; i++)
    {
        a_a = _mm256_loadu_pd(a+4*i);
        b_a = _mm256_loadu_pd(b+4*i);
        c_a = _mm256_mul_pd(a_a, b_a);
        _mm256_storeu_pd (d+i*4, c_a);
    }
}


int main()
{
    float *a, *b, *c, *d, *e, *f;
    double *a_d, *b_d, *c_d, *d_d, *e_d, *f_d;
    a = new float[SIZE] {0};
    b = new float[SIZE] {0};
    c = new float[SIZE] {0};
    d = new float[SIZE] {0};
    e = new float[SIZE] {0};
    f = new float[SIZE] {0};
    a_d = new double[SIZE] {0};
    b_d = new double[SIZE] {0};
    c_d = new double[SIZE] {0};
    d_d = new double[SIZE] {0};
    e_d = new double[SIZE] {0};
    f_d = new double[SIZE] {0};
    for(int i = 0; i < SIZE; i++)
    {
        a[i] = i;
        b[i] = i;
        a_d[i] = i;
        b_d[i] = i;
    };
    std::cout << "Now doing the single float rounds!\n";
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS; i++)
    {
        multiply_singular(a, b, c);
    }
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration_ss = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the omp float rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS*10; i++)
    {
        multiply_omp(a, b, d);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_so = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the avx float rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS*10; i++)
    {
        multiply_avx(a, b, e);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_sa = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the avx omp float rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS*10; i++)
    {
        multiply_avx_omp(a, b, e);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_sao = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the single double rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS; i++)
    {
        multiply_singular_double(a_d, b_d, c_d);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_ds = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the omp double rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS*10; i++)
    {
        multiply_omp_double(a_d, b_d, d_d);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_do = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the avx double rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS*10; i++)
    {
        multiply_avx_double(a_d, b_d, e_d);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_da = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Now doing the avx omp double rounds!\n";
    t1 = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < ROUNDS*10; i++)
    {
        multiply_avx_double_omp(a_d, b_d, f_d);
    };
    t2 = std::chrono::high_resolution_clock::now();
    auto duration_dao = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
    std::cout << "Finished\n";
    std::cout << "Elapsed time for functions:\n";
    std::cout << "Function\ttime[ms]\n";
    std::cout << "Singular float:\t" << duration_ss/ROUNDS << '\n';
    std::cout << "OMP float:\t" << duration_so/(ROUNDS*10) << '\n';
    std::cout << "AVX float avx:\t" << duration_sa/(ROUNDS*10) << '\n';
    std::cout << "OMP AVX float avx omp:\t" << duration_sao/(ROUNDS*10) << '\n';
    std::cout << "Singular double:\t" << duration_ds/ROUNDS << '\n';
    std::cout << "OMP double:\t" << duration_do/(ROUNDS*10) << '\n';
    std::cout << "AVX double:\t" << duration_da/(ROUNDS*10) << '\n';
    std::cout << "OMP AVX double:\t" << duration_dao/(ROUNDS*10) << '\n';
    delete[] a;
    delete[] b;
    delete[] c;
    delete[] d;
    delete[] e;
    delete[] f;
    delete[] a_d;
    delete[] b_d;
    delete[] c_d;
    delete[] d_d;
    delete[] e_d;
    delete[] f_d;
    return 0;
}

g++-5 -fopenmp -std=c++14 -march=native test_new.cpp -o test -lgomp 编译时,我得到

Elapsed time for functions:
Function    time[ms]
Singular float: 117.979
OMP float:  40.5385
AVX float avx:  60.2964
OMP AVX float avx omp:  61.4206
Singular double:    129.59
OMP double: 200.745
AVX double: 136.715
OMP AVX double: 122.176

或稍后运行

Elapsed time for functions:
Function    time[ms]
Singular float: 113.932
OMP float:  39.2581
AVX float avx:  58.3029
OMP AVX float avx omp:  60.0023
Singular double:    123.575
OMP double: 66.0327
AVX double: 124.293
OMP AVX double: 318.038

这里显然纯 omp-函数比其他函数更快,甚至与 AVX 函数一样。将 -O3-switch 添加到编译行时,我得到以下结果:

Elapsed time for functions:
Function    time[ms]
Singular float: 12.7361
OMP float:  4.82436
AVX float avx:  14.7514
OMP AVX float avx omp:  14.7225
Singular double:    27.9976
OMP double: 8.50957
AVX double: 32.5175
OMP AVX double: 257.219

这里再次 omp 明显快于其他所有方法,而 AVX 最慢,甚至比线性方法慢。这是为什么?是我的 AVX 函数实现很糟糕,还是有其他问题?

在 Ubuntu 14.04.1、i7 Sandy Bridge、gcc 版本 5.3.0 上执行。

编辑:我发现一个错误:我应该将 avx 函数中临时变量的声明移到 for 循环内,这让我接近 omp 级别(并提供正确的结果)。

编辑 2:当禁用 -O3-开关时,OMP-AVX-指令比 OMP-函数更快,它们是几乎持平

编辑 3:每次在执行下一个循环之前用随机数据填充数组时,我得到 (with -O3):

Elapsed time for functions:
Function    time[ms]
Singular float: 30.742
Cilk float: 24.0769
OMP float:  17.2415
AVX float avx:  33.0217
OMP AVX float avx omp:  10.1934
Singular double:    60.412
Cilk double:    34.6458
OMP double: 19.0739
AVX double: 66.8676
OMP AVX double: 22.3586

无:

Elapsed time for functions:
Function    time[ms]
Singular float: 274.402
Cilk float: 88.258
OMP float:  66.2124
AVX float avx:  117.066
OMP AVX float avx omp:  35.0313
Singular double:    238.652
Cilk double:    91.1667
OMP double: 127.621
AVX double: 249.516
OMP AVX double: 116.24

(我也添加了一个 cilk_for() 循环用于比较)。

更新: 我还添加了(如答案中所建议的)使用 #pragma omp parallel for simd 的函数。 结果是:

Elapsed time for functions:
Function                time[ms]
Singular float:         106.081
Cilk float:             33.2761
OMP float:              17.0651
AVX float avx:          65.1129
OMP AVX float:          19.1496
SIMD OMP float:         2.6095
Aligned AVX OMP float:  18.1165
Singular double:        118.939
Cilk double:            53.1102
OMP double:             35.652
AVX double:             131.24
OMP AVX double:         39.4377
SIMD OMP double:        7.0748
Aligned AVX OMP double: 38.4474

对于支持 OpenMP 的编译器4.x,您可能希望从这样的事情开始:

void multiply_singular_omp_for_simd(float *a, float *b, float *d)
{
    #pragma omp parallel for simd schedule (static,16)
    for(int i = 0; i < SIZE; i++)
        d[i] = a[i]*b[i];
}

它将为您提供 SIMD 和线程并行性。并行分解将自动完成,首先将并行 tasks/chunks 分布在 threads/cores 中,其次对每个 task/chunk 分布单独的迭代 "across" simd "lanes"。

如果您感到担心,请阅读给定的几篇文章: Threading and SIMD in OpenMP4, ICC documentation.

从形式上讲,您表达问题的方式有点模棱两可,因为从 4.0 开始,OMP 循环可能是 SIMD、Threading 或 SIMD+Threading parallel。所以这不再是关于 OMP 与 SIMD 的问题了。相反,它是关于 OMP SIMD 与 OMP 线程的对比。

不确定您给定的 GCC 实现有多好,但是 ICC/IFORT 现在可以在相对较长的时间内为 simd 处理 omp parallel。 GCC 也应该从 5.x 开始支持它(GCC 支持#pragma omp simd 有一段时间了,但是对于 #pragma omp parallel for simd 来说没有必要)。

为了优化编译器驱动的实现,理想情况下,您可能更喜欢进行缓存阻塞和手动拆分迭代 space 以使外层循环由 omp parallel for 驱动,而最内层循环由 omp simd 驱动。但这可能稍微超出了原始问题的范围。