向量乘法中的 SIMD 与 OMP
SIMD vs OMP in vector multiplication
在我的项目中,我必须对 double *a
-向量或 float *a
-向量进行多次向量乘法运算。为了加快速度,我想使用 SIMD 操作或 omp
。为了获得最快的结果,我写了一个基准程序:
#include <iostream>
#include <memory>
#include <vector>
#include <omp.h>
#include <immintrin.h>
#include <stdlib.h>
#include <chrono>
#define SIZE 32768
#define ROUNDS 1e5
void multiply_singular(float *a, float *b, float *d)
{
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_omp(float *a, float *b, float *d)
{
#pragma omp parallel for
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_avx(float *a, float *b, float *d)
{
__m256 a_a, b_a, c_a;
for(int i = 0; i < SIZE/8; i++)
{
a_a = _mm256_loadu_ps(a+8*i);
b_a = _mm256_loadu_ps(b+8*i);
c_a = _mm256_mul_ps(a_a, b_a);
_mm256_storeu_ps (d+i*8, c_a);
}
}
void multiply_avx_omp(float *a, float *b, float *d)
{
__m256 a_a, b_a, c_a;
#pragma omp for
for(int i = 0; i < SIZE/8; i++)
{
a_a = _mm256_loadu_ps(a+8*i);
b_a = _mm256_loadu_ps(b+8*i);
c_a = _mm256_mul_ps(a_a, b_a);
_mm256_storeu_ps (d+i*8, c_a);
}
}
void multiply_singular_double(double *a, double *b, double *d)
{
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_omp_double(double *a, double *b, double *d)
{
#pragma omp parallel for
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_avx_double(double *a, double *b, double *d)
{
__m256d a_a, b_a, c_a;
for(int i = 0; i < SIZE/4; i++)
{
a_a = _mm256_loadu_pd(a+4*i);
b_a = _mm256_loadu_pd(b+4*i);
c_a = _mm256_mul_pd(a_a, b_a);
_mm256_storeu_pd (d+i*4, c_a);
}
}
void multiply_avx_double_omp(double *a, double *b, double *d)
{
__m256d a_a, b_a, c_a;
#pragma omp parallel for
for(int i = 0; i < SIZE/4; i++)
{
a_a = _mm256_loadu_pd(a+4*i);
b_a = _mm256_loadu_pd(b+4*i);
c_a = _mm256_mul_pd(a_a, b_a);
_mm256_storeu_pd (d+i*4, c_a);
}
}
int main()
{
float *a, *b, *c, *d, *e, *f;
double *a_d, *b_d, *c_d, *d_d, *e_d, *f_d;
a = new float[SIZE] {0};
b = new float[SIZE] {0};
c = new float[SIZE] {0};
d = new float[SIZE] {0};
e = new float[SIZE] {0};
f = new float[SIZE] {0};
a_d = new double[SIZE] {0};
b_d = new double[SIZE] {0};
c_d = new double[SIZE] {0};
d_d = new double[SIZE] {0};
e_d = new double[SIZE] {0};
f_d = new double[SIZE] {0};
for(int i = 0; i < SIZE; i++)
{
a[i] = i;
b[i] = i;
a_d[i] = i;
b_d[i] = i;
};
std::cout << "Now doing the single float rounds!\n";
std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS; i++)
{
multiply_singular(a, b, c);
}
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
auto duration_ss = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the omp float rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_omp(a, b, d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_so = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx float rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx(a, b, e);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_sa = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx omp float rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx_omp(a, b, e);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_sao = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the single double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS; i++)
{
multiply_singular_double(a_d, b_d, c_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_ds = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the omp double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_omp_double(a_d, b_d, d_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_do = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx_double(a_d, b_d, e_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_da = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx omp double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx_double_omp(a_d, b_d, f_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_dao = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Finished\n";
std::cout << "Elapsed time for functions:\n";
std::cout << "Function\ttime[ms]\n";
std::cout << "Singular float:\t" << duration_ss/ROUNDS << '\n';
std::cout << "OMP float:\t" << duration_so/(ROUNDS*10) << '\n';
std::cout << "AVX float avx:\t" << duration_sa/(ROUNDS*10) << '\n';
std::cout << "OMP AVX float avx omp:\t" << duration_sao/(ROUNDS*10) << '\n';
std::cout << "Singular double:\t" << duration_ds/ROUNDS << '\n';
std::cout << "OMP double:\t" << duration_do/(ROUNDS*10) << '\n';
std::cout << "AVX double:\t" << duration_da/(ROUNDS*10) << '\n';
std::cout << "OMP AVX double:\t" << duration_dao/(ROUNDS*10) << '\n';
delete[] a;
delete[] b;
delete[] c;
delete[] d;
delete[] e;
delete[] f;
delete[] a_d;
delete[] b_d;
delete[] c_d;
delete[] d_d;
delete[] e_d;
delete[] f_d;
return 0;
}
用 g++-5 -fopenmp -std=c++14 -march=native test_new.cpp -o test -lgomp
编译时,我得到
Elapsed time for functions:
Function time[ms]
Singular float: 117.979
OMP float: 40.5385
AVX float avx: 60.2964
OMP AVX float avx omp: 61.4206
Singular double: 129.59
OMP double: 200.745
AVX double: 136.715
OMP AVX double: 122.176
或稍后运行
Elapsed time for functions:
Function time[ms]
Singular float: 113.932
OMP float: 39.2581
AVX float avx: 58.3029
OMP AVX float avx omp: 60.0023
Singular double: 123.575
OMP double: 66.0327
AVX double: 124.293
OMP AVX double: 318.038
这里显然纯 omp
-函数比其他函数更快,甚至与 AVX 函数一样。将 -O3
-switch 添加到编译行时,我得到以下结果:
Elapsed time for functions:
Function time[ms]
Singular float: 12.7361
OMP float: 4.82436
AVX float avx: 14.7514
OMP AVX float avx omp: 14.7225
Singular double: 27.9976
OMP double: 8.50957
AVX double: 32.5175
OMP AVX double: 257.219
这里再次 omp
明显快于其他所有方法,而 AVX 最慢,甚至比线性方法慢。这是为什么?是我的 AVX 函数实现很糟糕,还是有其他问题?
在 Ubuntu 14.04.1、i7 Sandy Bridge、gcc 版本 5.3.0 上执行。
编辑:我发现一个错误:我应该将 avx
函数中临时变量的声明移到 for 循环内,这让我接近 omp
级别(并提供正确的结果)。
编辑 2:当禁用 -O3
-开关时,OMP
-AVX
-指令比 OMP
-函数更快,它们是几乎持平
编辑 3:每次在执行下一个循环之前用随机数据填充数组时,我得到 (with -O3
):
Elapsed time for functions:
Function time[ms]
Singular float: 30.742
Cilk float: 24.0769
OMP float: 17.2415
AVX float avx: 33.0217
OMP AVX float avx omp: 10.1934
Singular double: 60.412
Cilk double: 34.6458
OMP double: 19.0739
AVX double: 66.8676
OMP AVX double: 22.3586
无:
Elapsed time for functions:
Function time[ms]
Singular float: 274.402
Cilk float: 88.258
OMP float: 66.2124
AVX float avx: 117.066
OMP AVX float avx omp: 35.0313
Singular double: 238.652
Cilk double: 91.1667
OMP double: 127.621
AVX double: 249.516
OMP AVX double: 116.24
(我也添加了一个 cilk_for() 循环用于比较)。
更新:
我还添加了(如答案中所建议的)使用 #pragma omp parallel for simd
的函数。
结果是:
Elapsed time for functions:
Function time[ms]
Singular float: 106.081
Cilk float: 33.2761
OMP float: 17.0651
AVX float avx: 65.1129
OMP AVX float: 19.1496
SIMD OMP float: 2.6095
Aligned AVX OMP float: 18.1165
Singular double: 118.939
Cilk double: 53.1102
OMP double: 35.652
AVX double: 131.24
OMP AVX double: 39.4377
SIMD OMP double: 7.0748
Aligned AVX OMP double: 38.4474
对于支持 OpenMP 的编译器4.x,您可能希望从这样的事情开始:
void multiply_singular_omp_for_simd(float *a, float *b, float *d)
{
#pragma omp parallel for simd schedule (static,16)
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
它将为您提供 SIMD 和线程并行性。并行分解将自动完成,首先将并行 tasks/chunks 分布在 threads/cores 中,其次对每个 task/chunk 分布单独的迭代 "across" simd "lanes"。
如果您感到担心,请阅读给定的几篇文章:
Threading and SIMD in OpenMP4, ICC documentation.
从形式上讲,您表达问题的方式有点模棱两可,因为从 4.0 开始,OMP 循环可能是 SIMD、Threading 或 SIMD+Threading parallel。所以这不再是关于 OMP 与 SIMD 的问题了。相反,它是关于 OMP SIMD 与 OMP 线程的对比。
不确定您给定的 GCC 实现有多好,但是 ICC/IFORT 现在可以在相对较长的时间内为 simd 处理 omp parallel。 GCC 也应该从 5.x 开始支持它(GCC 支持#pragma omp simd 有一段时间了,但是对于 #pragma omp parallel for simd 来说没有必要)。
为了优化编译器驱动的实现,理想情况下,您可能更喜欢进行缓存阻塞和手动拆分迭代 space 以使外层循环由 omp parallel for 驱动,而最内层循环由 omp simd 驱动。但这可能稍微超出了原始问题的范围。
在我的项目中,我必须对 double *a
-向量或 float *a
-向量进行多次向量乘法运算。为了加快速度,我想使用 SIMD 操作或 omp
。为了获得最快的结果,我写了一个基准程序:
#include <iostream>
#include <memory>
#include <vector>
#include <omp.h>
#include <immintrin.h>
#include <stdlib.h>
#include <chrono>
#define SIZE 32768
#define ROUNDS 1e5
void multiply_singular(float *a, float *b, float *d)
{
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_omp(float *a, float *b, float *d)
{
#pragma omp parallel for
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_avx(float *a, float *b, float *d)
{
__m256 a_a, b_a, c_a;
for(int i = 0; i < SIZE/8; i++)
{
a_a = _mm256_loadu_ps(a+8*i);
b_a = _mm256_loadu_ps(b+8*i);
c_a = _mm256_mul_ps(a_a, b_a);
_mm256_storeu_ps (d+i*8, c_a);
}
}
void multiply_avx_omp(float *a, float *b, float *d)
{
__m256 a_a, b_a, c_a;
#pragma omp for
for(int i = 0; i < SIZE/8; i++)
{
a_a = _mm256_loadu_ps(a+8*i);
b_a = _mm256_loadu_ps(b+8*i);
c_a = _mm256_mul_ps(a_a, b_a);
_mm256_storeu_ps (d+i*8, c_a);
}
}
void multiply_singular_double(double *a, double *b, double *d)
{
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_omp_double(double *a, double *b, double *d)
{
#pragma omp parallel for
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
void multiply_avx_double(double *a, double *b, double *d)
{
__m256d a_a, b_a, c_a;
for(int i = 0; i < SIZE/4; i++)
{
a_a = _mm256_loadu_pd(a+4*i);
b_a = _mm256_loadu_pd(b+4*i);
c_a = _mm256_mul_pd(a_a, b_a);
_mm256_storeu_pd (d+i*4, c_a);
}
}
void multiply_avx_double_omp(double *a, double *b, double *d)
{
__m256d a_a, b_a, c_a;
#pragma omp parallel for
for(int i = 0; i < SIZE/4; i++)
{
a_a = _mm256_loadu_pd(a+4*i);
b_a = _mm256_loadu_pd(b+4*i);
c_a = _mm256_mul_pd(a_a, b_a);
_mm256_storeu_pd (d+i*4, c_a);
}
}
int main()
{
float *a, *b, *c, *d, *e, *f;
double *a_d, *b_d, *c_d, *d_d, *e_d, *f_d;
a = new float[SIZE] {0};
b = new float[SIZE] {0};
c = new float[SIZE] {0};
d = new float[SIZE] {0};
e = new float[SIZE] {0};
f = new float[SIZE] {0};
a_d = new double[SIZE] {0};
b_d = new double[SIZE] {0};
c_d = new double[SIZE] {0};
d_d = new double[SIZE] {0};
e_d = new double[SIZE] {0};
f_d = new double[SIZE] {0};
for(int i = 0; i < SIZE; i++)
{
a[i] = i;
b[i] = i;
a_d[i] = i;
b_d[i] = i;
};
std::cout << "Now doing the single float rounds!\n";
std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS; i++)
{
multiply_singular(a, b, c);
}
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
auto duration_ss = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the omp float rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_omp(a, b, d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_so = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx float rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx(a, b, e);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_sa = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx omp float rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx_omp(a, b, e);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_sao = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the single double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS; i++)
{
multiply_singular_double(a_d, b_d, c_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_ds = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the omp double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_omp_double(a_d, b_d, d_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_do = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx_double(a_d, b_d, e_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_da = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Now doing the avx omp double rounds!\n";
t1 = std::chrono::high_resolution_clock::now();
for(int i = 0; i < ROUNDS*10; i++)
{
multiply_avx_double_omp(a_d, b_d, f_d);
};
t2 = std::chrono::high_resolution_clock::now();
auto duration_dao = std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count();
std::cout << "Finished\n";
std::cout << "Elapsed time for functions:\n";
std::cout << "Function\ttime[ms]\n";
std::cout << "Singular float:\t" << duration_ss/ROUNDS << '\n';
std::cout << "OMP float:\t" << duration_so/(ROUNDS*10) << '\n';
std::cout << "AVX float avx:\t" << duration_sa/(ROUNDS*10) << '\n';
std::cout << "OMP AVX float avx omp:\t" << duration_sao/(ROUNDS*10) << '\n';
std::cout << "Singular double:\t" << duration_ds/ROUNDS << '\n';
std::cout << "OMP double:\t" << duration_do/(ROUNDS*10) << '\n';
std::cout << "AVX double:\t" << duration_da/(ROUNDS*10) << '\n';
std::cout << "OMP AVX double:\t" << duration_dao/(ROUNDS*10) << '\n';
delete[] a;
delete[] b;
delete[] c;
delete[] d;
delete[] e;
delete[] f;
delete[] a_d;
delete[] b_d;
delete[] c_d;
delete[] d_d;
delete[] e_d;
delete[] f_d;
return 0;
}
用 g++-5 -fopenmp -std=c++14 -march=native test_new.cpp -o test -lgomp
编译时,我得到
Elapsed time for functions:
Function time[ms]
Singular float: 117.979
OMP float: 40.5385
AVX float avx: 60.2964
OMP AVX float avx omp: 61.4206
Singular double: 129.59
OMP double: 200.745
AVX double: 136.715
OMP AVX double: 122.176
或稍后运行
Elapsed time for functions:
Function time[ms]
Singular float: 113.932
OMP float: 39.2581
AVX float avx: 58.3029
OMP AVX float avx omp: 60.0023
Singular double: 123.575
OMP double: 66.0327
AVX double: 124.293
OMP AVX double: 318.038
这里显然纯 omp
-函数比其他函数更快,甚至与 AVX 函数一样。将 -O3
-switch 添加到编译行时,我得到以下结果:
Elapsed time for functions:
Function time[ms]
Singular float: 12.7361
OMP float: 4.82436
AVX float avx: 14.7514
OMP AVX float avx omp: 14.7225
Singular double: 27.9976
OMP double: 8.50957
AVX double: 32.5175
OMP AVX double: 257.219
这里再次 omp
明显快于其他所有方法,而 AVX 最慢,甚至比线性方法慢。这是为什么?是我的 AVX 函数实现很糟糕,还是有其他问题?
在 Ubuntu 14.04.1、i7 Sandy Bridge、gcc 版本 5.3.0 上执行。
编辑:我发现一个错误:我应该将 avx
函数中临时变量的声明移到 for 循环内,这让我接近 omp
级别(并提供正确的结果)。
编辑 2:当禁用 -O3
-开关时,OMP
-AVX
-指令比 OMP
-函数更快,它们是几乎持平
编辑 3:每次在执行下一个循环之前用随机数据填充数组时,我得到 (with -O3
):
Elapsed time for functions:
Function time[ms]
Singular float: 30.742
Cilk float: 24.0769
OMP float: 17.2415
AVX float avx: 33.0217
OMP AVX float avx omp: 10.1934
Singular double: 60.412
Cilk double: 34.6458
OMP double: 19.0739
AVX double: 66.8676
OMP AVX double: 22.3586
无:
Elapsed time for functions:
Function time[ms]
Singular float: 274.402
Cilk float: 88.258
OMP float: 66.2124
AVX float avx: 117.066
OMP AVX float avx omp: 35.0313
Singular double: 238.652
Cilk double: 91.1667
OMP double: 127.621
AVX double: 249.516
OMP AVX double: 116.24
(我也添加了一个 cilk_for() 循环用于比较)。
更新:
我还添加了(如答案中所建议的)使用 #pragma omp parallel for simd
的函数。
结果是:
Elapsed time for functions:
Function time[ms]
Singular float: 106.081
Cilk float: 33.2761
OMP float: 17.0651
AVX float avx: 65.1129
OMP AVX float: 19.1496
SIMD OMP float: 2.6095
Aligned AVX OMP float: 18.1165
Singular double: 118.939
Cilk double: 53.1102
OMP double: 35.652
AVX double: 131.24
OMP AVX double: 39.4377
SIMD OMP double: 7.0748
Aligned AVX OMP double: 38.4474
对于支持 OpenMP 的编译器4.x,您可能希望从这样的事情开始:
void multiply_singular_omp_for_simd(float *a, float *b, float *d)
{
#pragma omp parallel for simd schedule (static,16)
for(int i = 0; i < SIZE; i++)
d[i] = a[i]*b[i];
}
它将为您提供 SIMD 和线程并行性。并行分解将自动完成,首先将并行 tasks/chunks 分布在 threads/cores 中,其次对每个 task/chunk 分布单独的迭代 "across" simd "lanes"。
如果您感到担心,请阅读给定的几篇文章: Threading and SIMD in OpenMP4, ICC documentation.
从形式上讲,您表达问题的方式有点模棱两可,因为从 4.0 开始,OMP 循环可能是 SIMD、Threading 或 SIMD+Threading parallel。所以这不再是关于 OMP 与 SIMD 的问题了。相反,它是关于 OMP SIMD 与 OMP 线程的对比。
不确定您给定的 GCC 实现有多好,但是 ICC/IFORT 现在可以在相对较长的时间内为 simd 处理 omp parallel。 GCC 也应该从 5.x 开始支持它(GCC 支持#pragma omp simd 有一段时间了,但是对于 #pragma omp parallel for simd 来说没有必要)。
为了优化编译器驱动的实现,理想情况下,您可能更喜欢进行缓存阻塞和手动拆分迭代 space 以使外层循环由 omp parallel for 驱动,而最内层循环由 omp simd 驱动。但这可能稍微超出了原始问题的范围。