OpenMP 多线程更新同一个数组
OpenMP multiple threads update same array
我的程序中有以下代码,我想使用 OpenMP 对其进行加速。
...
for(i=curr_index; i < curr_index + rx_size; i+=2){
int64_t tgt = rcvq[i];
int64_t src = rcvq[i+1];
if (!TEST(tgt)) {
pred[tgt] = src;
newq[newq_count++] = tgt;
}
}
目前我的版本如下:
...
chunk = rx_sz / omp_nthreads;
#pragma omp parallel for num_threads(omp_nthreads)
for (ii = 0; ii < omp_nthreads; ii++) {
int start = curr_index + ii * chunk;
for (index = start; index < start + chunk; index +=2) {
int64_t tgt = rcvq[index];
int64_t src = rcvq[index+1];
if (!TEST(tgt)) {
pred[tgt] = src;
#pragma omp critical
newq[newq_count++] = tgt;
}
}
}
当我 运行 OpenMP 版本时,我发现与原始版本相比性能下降很大。我认为问题可能是因为 "omp critical" 阻止了并行处理。我想知道我的代码可以增强哪些功能,以便我可以获得比串行版本更好的性能。在代码中,rx_sz 始终是 omp_nthreads 的倍数。
我很确定此时 omp 关键部分限制了您的性能。
我建议您将结果收集到单独的 buffers/vectors 中,并在并行处理完成后合并它们(当然,如果顺序对您来说不重要)
vector<vector<int64_t>> res;
res.resize(num_threads);
#pragma omp parallel for
for (index = 0; index < rx_sz/2; ++index) {
int64_t tgt = rcvq[2*index];
int64_t src = rcvq[2*index+1];
if (!TEST(tgt)) {
pred[tgt] = src;
res[omp_get_thread_num()].push_back(tgt);
}
}
// Merge all res vectors if needed
是的,关键部分限制了你的表现。您应该在本地为每个线程收集结果,然后合并它们。
size_t newq_offset = 0;
#pragma omp parallel
{
// Figure out something clever here...
const size_t max_newq_per_thread = max_newq / omp_get_num_threads();
int64_t* local_newq = malloc(max_results_per_thread * sizeof(int64_t));
size_t local_newq_count = 0;
#pragma omp parallel for
for (i=curr_index; i < curr_index + rx_size; i+=2)
int64_t tgt = rcvq[2*index];
int64_t src = rcvq[2*index+1];
if (!TEST(tgt)) {
pred[tgt] = src;
local_newq_count++;
assert(local_newq_count < max_newq_per_thread);
local_newq[local_newq_count] = tgt;
}
}
int local_offset;
#pragma omp atomic capture
{
local_offset = offset;
offset += local_newq_count;
}
for (size_t i = 0; i < counter; i++)
{
res_global[i + local_offset] = res[i];
}
}
使用这种方法,所有线程在合并时并行工作,atomic capture
上的争用很小。请注意,您还可以使用 atomic capture
制作一个简单版本,它比临界区更高效,但仍然会很快成为瓶颈:
size_t newq_count_local;
#pragma omp atomic capture
newq_count_local = newq_count++;
newq[newq_count_local] = tgt;
- 在任何版本
中newq
内不保证顺序
- 始终尽可能将变量声明为局部变量!特别是在使用 OpenMP 时。您发布的
critical
版本是 错误的 ,因为 index
(在外部范围内定义)在线程之间隐式共享。
- 所有这些都假定
rcvq
中没有重复项。否则,您会在 pred[tgt] = src;
. 上出现竞争条件
- 您手动分割循环的方法不必要地复杂。不需要做两个循环,只需使用一个
pragma omp for
循环。
另一个答案是正确的。但是,它是 C++,而不是标记的 C。使用 std::vector<std::vector<>>
时还有一个微妙但重要的性能问题。通常一个vector是用三个指针来实现的,一共24个字节。在 push_back
时,其中一个指针递增。这意味着,a) 来自多个线程的本地向量指针驻留在同一个缓存行上,并且 b) 在每次成功的 TEST
、push_back
读取和写入其他线程使用的缓存行时线程。这个缓存行必须一直在核心之间移动,极大地限制了这种方法的可扩展性。这就是所谓的虚假分享。
我根据另一个答案实施了一个小测试,结果如下:
0.99 s
- 单线程
1.58 s
- 同一套接字的两个相邻核心上的两个线程
2.13 s
- 不同套接字的两个内核上的两个线程
0.99 s
- 两个线程共享一个核心
0.62 s
- 两个套接字上的 24 个线程
而 C 版本以上的扩展性更好:
0.46 s
- 单线程(无法真正比较 C 与 C++)
0.24 s
- 两个线程
0.04 s
- 24 个线程
我的程序中有以下代码,我想使用 OpenMP 对其进行加速。
...
for(i=curr_index; i < curr_index + rx_size; i+=2){
int64_t tgt = rcvq[i];
int64_t src = rcvq[i+1];
if (!TEST(tgt)) {
pred[tgt] = src;
newq[newq_count++] = tgt;
}
}
目前我的版本如下:
...
chunk = rx_sz / omp_nthreads;
#pragma omp parallel for num_threads(omp_nthreads)
for (ii = 0; ii < omp_nthreads; ii++) {
int start = curr_index + ii * chunk;
for (index = start; index < start + chunk; index +=2) {
int64_t tgt = rcvq[index];
int64_t src = rcvq[index+1];
if (!TEST(tgt)) {
pred[tgt] = src;
#pragma omp critical
newq[newq_count++] = tgt;
}
}
}
当我 运行 OpenMP 版本时,我发现与原始版本相比性能下降很大。我认为问题可能是因为 "omp critical" 阻止了并行处理。我想知道我的代码可以增强哪些功能,以便我可以获得比串行版本更好的性能。在代码中,rx_sz 始终是 omp_nthreads 的倍数。
我很确定此时 omp 关键部分限制了您的性能。
我建议您将结果收集到单独的 buffers/vectors 中,并在并行处理完成后合并它们(当然,如果顺序对您来说不重要)
vector<vector<int64_t>> res;
res.resize(num_threads);
#pragma omp parallel for
for (index = 0; index < rx_sz/2; ++index) {
int64_t tgt = rcvq[2*index];
int64_t src = rcvq[2*index+1];
if (!TEST(tgt)) {
pred[tgt] = src;
res[omp_get_thread_num()].push_back(tgt);
}
}
// Merge all res vectors if needed
是的,关键部分限制了你的表现。您应该在本地为每个线程收集结果,然后合并它们。
size_t newq_offset = 0;
#pragma omp parallel
{
// Figure out something clever here...
const size_t max_newq_per_thread = max_newq / omp_get_num_threads();
int64_t* local_newq = malloc(max_results_per_thread * sizeof(int64_t));
size_t local_newq_count = 0;
#pragma omp parallel for
for (i=curr_index; i < curr_index + rx_size; i+=2)
int64_t tgt = rcvq[2*index];
int64_t src = rcvq[2*index+1];
if (!TEST(tgt)) {
pred[tgt] = src;
local_newq_count++;
assert(local_newq_count < max_newq_per_thread);
local_newq[local_newq_count] = tgt;
}
}
int local_offset;
#pragma omp atomic capture
{
local_offset = offset;
offset += local_newq_count;
}
for (size_t i = 0; i < counter; i++)
{
res_global[i + local_offset] = res[i];
}
}
使用这种方法,所有线程在合并时并行工作,atomic capture
上的争用很小。请注意,您还可以使用 atomic capture
制作一个简单版本,它比临界区更高效,但仍然会很快成为瓶颈:
size_t newq_count_local;
#pragma omp atomic capture
newq_count_local = newq_count++;
newq[newq_count_local] = tgt;
- 在任何版本 中
- 始终尽可能将变量声明为局部变量!特别是在使用 OpenMP 时。您发布的
critical
版本是 错误的 ,因为index
(在外部范围内定义)在线程之间隐式共享。 - 所有这些都假定
rcvq
中没有重复项。否则,您会在pred[tgt] = src;
. 上出现竞争条件
- 您手动分割循环的方法不必要地复杂。不需要做两个循环,只需使用一个
pragma omp for
循环。
newq
内不保证顺序
另一个答案是正确的。但是,它是 C++,而不是标记的 C。使用 std::vector<std::vector<>>
时还有一个微妙但重要的性能问题。通常一个vector是用三个指针来实现的,一共24个字节。在 push_back
时,其中一个指针递增。这意味着,a) 来自多个线程的本地向量指针驻留在同一个缓存行上,并且 b) 在每次成功的 TEST
、push_back
读取和写入其他线程使用的缓存行时线程。这个缓存行必须一直在核心之间移动,极大地限制了这种方法的可扩展性。这就是所谓的虚假分享。
我根据另一个答案实施了一个小测试,结果如下:
0.99 s
- 单线程1.58 s
- 同一套接字的两个相邻核心上的两个线程2.13 s
- 不同套接字的两个内核上的两个线程0.99 s
- 两个线程共享一个核心0.62 s
- 两个套接字上的 24 个线程
而 C 版本以上的扩展性更好:
0.46 s
- 单线程(无法真正比较 C 与 C++)0.24 s
- 两个线程0.04 s
- 24 个线程