这个 OpenMP 障碍有什么解决方法吗?
Is there any workaround for this OpenMP barrier?
我有这个用 OpenMp 编写的并行区域:
std::vector<T> sharedResult;
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
}
#pramga omp barrier
#pragma omp for nowait
for(size_t i=0; i<sharedResult.size(); i++){
foo(sharedResult[i]);
}
...
}
恐怕 #pragma omp barrier
是必要的。我认为的原因是,否则当一个线程命中最后一个#pragma omp for
时,那个时刻的sharedResult.size()
还不是他的最终状态(在前面的parallel for结束时得到的)。请注意,不幸的是 sharedResult
的大小以前是未知的。
不幸的是,我注意到这个障碍会产生很大的开销,即一个特定的迭代比所有其他迭代都更昂贵,因此所有线程都必须等待执行该迭代的线程。这可以认为是负载不平衡,但我没有找到解决这个问题的方法。
所以我的问题是:有没有什么方法可以启动最后一个并行而不等待前一个并行完成或者真的没有办法改进它?
我同意障碍是必要的。我看到了几种出路,随着复杂性的增加和效率的提高:
任务
Post 每个结果元素一个任务:
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
// would prefer a range based loop here, but
// there seem to be issues with passing references
// to tasks in certain compilers
for(size_t i=0; i<result.size(); i++){
{
#pragma omp task
foo(result[i]);
}
}
您甚至可以 post 初始循环中的任务。如果任务太多,您可能会得到很大的开销。
正在用完成的线程处理结果队列
现在这个比较棘手 - 特别是您需要区分结果队列为空和所有线程完成其第一个循环。
std::vector<T> sharedResult;
int threadsBusy;
size_t resultIndex = 0;
#pragma omp parallel
{
#pragma omp single
threadsBusy = omp_num_threads();
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical
{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
threadsBusy--;
}
do {
bool hasResult, allThreadsDone;
// We need a copy here as the vector may be resized
// and elements may become invalid by insertion
T myResult;
#pragma omp critical
{
if (resultIndex < sharedResult.size()) {
resultIndex++;
hasResult = true;
myResult = sharedResult[myResult];
} else {
hasResult = false;
}
allThreadsDone = threadsBusy == 0;
}
if (hasResult) {
foo(myResult);
} else {
if (allThreadsDone) {
break;
}
// If we just continue here, we will spin on the mutex
// Unfortunately there are no condition variables in OpenMP
// So instead we go for a quick nap as a compromise
// Feel free to tune this accordingly
std::this_thread::sleep_for(10ms);
}
} while (true);
}
注意:通常我会在这里测试我 post 的代码,但由于缺少完整的示例,我无法做到。
通过并行循环处理结果块
最后,对于那些已经完成的结果,您可以 运行 并行多次 for 循环。但是,这有很多问题。首先,所有线程 必须遇到每个工作共享区域,即使是那些较晚完成第一个线程的线程。所以你必须跟踪你的循环运行。此外,每个线程的循环绑定需要相同 - 并且您只能在关键部分读取 sharedResult.size()
。因此,您必须事先通过关键部分中的一个线程将其读取到共享变量,但要等待所有线程直到它被正确读取。此外,您将不得不使用动态调度,否则您可能会使用静态调度,并且您将等待最后完成的线程。您编辑的示例不会执行这些操作。我不会理所当然地认为 for nowait schedule(dynamic)
可以在团队中的所有线程进入它之前完成(但它适用于 libgomp)。综合考虑,我不会真的去那里。
我有这个用 OpenMp 编写的并行区域:
std::vector<T> sharedResult;
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
}
#pramga omp barrier
#pragma omp for nowait
for(size_t i=0; i<sharedResult.size(); i++){
foo(sharedResult[i]);
}
...
}
恐怕 #pragma omp barrier
是必要的。我认为的原因是,否则当一个线程命中最后一个#pragma omp for
时,那个时刻的sharedResult.size()
还不是他的最终状态(在前面的parallel for结束时得到的)。请注意,不幸的是 sharedResult
的大小以前是未知的。
不幸的是,我注意到这个障碍会产生很大的开销,即一个特定的迭代比所有其他迭代都更昂贵,因此所有线程都必须等待执行该迭代的线程。这可以认为是负载不平衡,但我没有找到解决这个问题的方法。
所以我的问题是:有没有什么方法可以启动最后一个并行而不等待前一个并行完成或者真的没有办法改进它?
我同意障碍是必要的。我看到了几种出路,随着复杂性的增加和效率的提高:
任务
Post 每个结果元素一个任务:
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
// would prefer a range based loop here, but
// there seem to be issues with passing references
// to tasks in certain compilers
for(size_t i=0; i<result.size(); i++){
{
#pragma omp task
foo(result[i]);
}
}
您甚至可以 post 初始循环中的任务。如果任务太多,您可能会得到很大的开销。
正在用完成的线程处理结果队列
现在这个比较棘手 - 特别是您需要区分结果队列为空和所有线程完成其第一个循环。
std::vector<T> sharedResult;
int threadsBusy;
size_t resultIndex = 0;
#pragma omp parallel
{
#pragma omp single
threadsBusy = omp_num_threads();
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical
{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
threadsBusy--;
}
do {
bool hasResult, allThreadsDone;
// We need a copy here as the vector may be resized
// and elements may become invalid by insertion
T myResult;
#pragma omp critical
{
if (resultIndex < sharedResult.size()) {
resultIndex++;
hasResult = true;
myResult = sharedResult[myResult];
} else {
hasResult = false;
}
allThreadsDone = threadsBusy == 0;
}
if (hasResult) {
foo(myResult);
} else {
if (allThreadsDone) {
break;
}
// If we just continue here, we will spin on the mutex
// Unfortunately there are no condition variables in OpenMP
// So instead we go for a quick nap as a compromise
// Feel free to tune this accordingly
std::this_thread::sleep_for(10ms);
}
} while (true);
}
注意:通常我会在这里测试我 post 的代码,但由于缺少完整的示例,我无法做到。
通过并行循环处理结果块
最后,对于那些已经完成的结果,您可以 运行 并行多次 for 循环。但是,这有很多问题。首先,所有线程 必须遇到每个工作共享区域,即使是那些较晚完成第一个线程的线程。所以你必须跟踪你的循环运行。此外,每个线程的循环绑定需要相同 - 并且您只能在关键部分读取 sharedResult.size()
。因此,您必须事先通过关键部分中的一个线程将其读取到共享变量,但要等待所有线程直到它被正确读取。此外,您将不得不使用动态调度,否则您可能会使用静态调度,并且您将等待最后完成的线程。您编辑的示例不会执行这些操作。我不会理所当然地认为 for nowait schedule(dynamic)
可以在团队中的所有线程进入它之前完成(但它适用于 libgomp)。综合考虑,我不会真的去那里。