使用 openmp 任务的部分并行循环
Partially parallel loops using openmp tasks
先决条件:
- 并行引擎:OpenMP 3.1+(如果需要可以是 OpenMP 4.0)
- 并行构造:OpenMP 任务
- 编译器:gcc 4.9.x(支持 OpenMP 4.0)
输入:
- 带循环的 C 代码
- 循环有交叉迭代数据依赖:“i+1”迭代需要来自“i”迭代的数据(只有这种依赖,没有别的)
- 循环体可以部分依赖
- 循环不能分成两个循环;循环体应该保持稳定
- 任何合理的东西都可以添加到循环或循环体函数定义中
代码示例:
(此处conf/config/configData变量仅供参考,主要兴趣在value/valueData变量内。)
void loopFunc(const char* config, int* value)
{
int conf;
conf = prepare(config); // independent, does not change “config”
*value = process(conf, *value); // dependent, takes prev., produce next
return;
}
int main()
{
int N = 100;
char* configData; // never changes
int valueData = 0; // initial value
…
for (int i = 0; i < N; i++)
{
loopFunc(configData, &valueData);
}
…
}
需要:
- 使用 omp 任务的并行循环(不能使用 omp for / omp 部分)
- “准备”功能应与其他“准备”或“处理”功能并行执行
- “处理”函数应根据数据依赖性排序
已提出并实施的内容:
- 定义整数标志
- 给它分配一个第一次迭代的编号
- 每次迭代需要数据时等待标志等于它的迭代
- 下一次迭代的数据准备就绪时更新标志值
像这样:
(提醒一下,conf/config/configData变量仅供参考,主要兴趣在value/valueData变量内。)
void loopFunc(const char* config, int* value, volatile int *parSync, int iteration)
{
int conf;
conf = prepare(config); // independent, do not change “config”
while (*parSync != iteration) // wait for previous to be ready
{
#pragma omp taskyield
}
*value = process(conf, *value); // dependent, takes prev., produce next
*parSync = iteration + 1; // inform next about readiness
return;
}
int main()
{
int N = 100;
char* configData; // never changes
int valueData = 0; // initial value
volatile int parallelSync = 0;
…
omp_set_num_threads(5);
#pragma omp parallel
#pragma omp single
for (int i = 0; i < N; i++)
{
#pragma omp task shared(configData, valueData, parallelSync) firstprivate(i)
loopFunc(configData, &valueData, ¶llelSync, i);
}
#pragma omp taskwait
…
}
发生了什么:
失败了。 :)
原因是openmp任务占用了openmp线程。
例如,如果我们定义 5 个 openmp 线程(如上面的代码)。
- “For”循环生成 100 个任务。
- OpenMP 运行时将 5 个任意任务分配给 5 个线程并启动这些任务。
如果启动的任务中没有i=0的任务(时常发生),执行中的任务永远等待,永远占用线程,i=0的任务永远不会启动。
下一步是什么?
我不知道如何实现所需的计算模式。
当前解
感谢下面@parallelgeek 的想法
int main()
{
int N = 10;
char* configData; // never changes
int valueData = 0; // initial value
volatile int parallelSync = 0;
int workers;
volatile int workingTasks = 0;
...
omp_set_num_threads(5);
#pragma omp parallel
#pragma omp single
{
workers = omp_get_num_threads()-1; // reserve 1 thread for task generation
for (int i = 0; i < N; i++)
{
while (workingTasks >= workers)
{
#pragma omp taskyield
}
#pragma omp atomic update
workingTasks++;
#pragma omp task shared(configData, valueData, parallelSync, workingTasks) firstprivate(i)
{
loopFunc(configData, &valueData, ¶llelSync, i);
#pragma omp atomic update
workingTasks--;
}
}
#pragma omp taskwait
}
}
- AFAIK volatiles 不会阻止硬件重新排序,这就是为什么你
最终可能会导致内存混乱,因为数据尚未写入,
而标志已被消费线程视为
true
。
- 这就是为什么要提一点建议:改用 C11 原子以确保数据的可见性。如我所见,gcc 4.9 支持 c11 C11Status in GCC
- 您可以尝试将生成的任务分成 K 个任务组,其中
K == ThreadNum
并仅在任何 运行 个任务之后才开始生成后续任务(在第一组中的任务生成之后)完成了。因此,您有一个 不变性 ,即 每次您只有 K 个任务 运行 并安排在 K 个线程上 。
- 也可以通过使用 C11 中的原子标志来满足任务间依赖性。
先决条件:
- 并行引擎:OpenMP 3.1+(如果需要可以是 OpenMP 4.0)
- 并行构造:OpenMP 任务
- 编译器:gcc 4.9.x(支持 OpenMP 4.0)
输入:
- 带循环的 C 代码
- 循环有交叉迭代数据依赖:“i+1”迭代需要来自“i”迭代的数据(只有这种依赖,没有别的)
- 循环体可以部分依赖
- 循环不能分成两个循环;循环体应该保持稳定
- 任何合理的东西都可以添加到循环或循环体函数定义中
代码示例:
(此处conf/config/configData变量仅供参考,主要兴趣在value/valueData变量内。)
void loopFunc(const char* config, int* value)
{
int conf;
conf = prepare(config); // independent, does not change “config”
*value = process(conf, *value); // dependent, takes prev., produce next
return;
}
int main()
{
int N = 100;
char* configData; // never changes
int valueData = 0; // initial value
…
for (int i = 0; i < N; i++)
{
loopFunc(configData, &valueData);
}
…
}
需要:
- 使用 omp 任务的并行循环(不能使用 omp for / omp 部分)
- “准备”功能应与其他“准备”或“处理”功能并行执行
- “处理”函数应根据数据依赖性排序
已提出并实施的内容:
- 定义整数标志
- 给它分配一个第一次迭代的编号
- 每次迭代需要数据时等待标志等于它的迭代
- 下一次迭代的数据准备就绪时更新标志值
像这样:
(提醒一下,conf/config/configData变量仅供参考,主要兴趣在value/valueData变量内。)
void loopFunc(const char* config, int* value, volatile int *parSync, int iteration)
{
int conf;
conf = prepare(config); // independent, do not change “config”
while (*parSync != iteration) // wait for previous to be ready
{
#pragma omp taskyield
}
*value = process(conf, *value); // dependent, takes prev., produce next
*parSync = iteration + 1; // inform next about readiness
return;
}
int main()
{
int N = 100;
char* configData; // never changes
int valueData = 0; // initial value
volatile int parallelSync = 0;
…
omp_set_num_threads(5);
#pragma omp parallel
#pragma omp single
for (int i = 0; i < N; i++)
{
#pragma omp task shared(configData, valueData, parallelSync) firstprivate(i)
loopFunc(configData, &valueData, ¶llelSync, i);
}
#pragma omp taskwait
…
}
发生了什么:
失败了。 :)
原因是openmp任务占用了openmp线程。 例如,如果我们定义 5 个 openmp 线程(如上面的代码)。
- “For”循环生成 100 个任务。
- OpenMP 运行时将 5 个任意任务分配给 5 个线程并启动这些任务。
如果启动的任务中没有i=0的任务(时常发生),执行中的任务永远等待,永远占用线程,i=0的任务永远不会启动。
下一步是什么?
我不知道如何实现所需的计算模式。
当前解
感谢下面@parallelgeek 的想法
int main()
{
int N = 10;
char* configData; // never changes
int valueData = 0; // initial value
volatile int parallelSync = 0;
int workers;
volatile int workingTasks = 0;
...
omp_set_num_threads(5);
#pragma omp parallel
#pragma omp single
{
workers = omp_get_num_threads()-1; // reserve 1 thread for task generation
for (int i = 0; i < N; i++)
{
while (workingTasks >= workers)
{
#pragma omp taskyield
}
#pragma omp atomic update
workingTasks++;
#pragma omp task shared(configData, valueData, parallelSync, workingTasks) firstprivate(i)
{
loopFunc(configData, &valueData, ¶llelSync, i);
#pragma omp atomic update
workingTasks--;
}
}
#pragma omp taskwait
}
}
- AFAIK volatiles 不会阻止硬件重新排序,这就是为什么你
最终可能会导致内存混乱,因为数据尚未写入,
而标志已被消费线程视为
true
。 - 这就是为什么要提一点建议:改用 C11 原子以确保数据的可见性。如我所见,gcc 4.9 支持 c11 C11Status in GCC
- 您可以尝试将生成的任务分成 K 个任务组,其中
K == ThreadNum
并仅在任何 运行 个任务之后才开始生成后续任务(在第一组中的任务生成之后)完成了。因此,您有一个 不变性 ,即 每次您只有 K 个任务 运行 并安排在 K 个线程上 。 - 也可以通过使用 C11 中的原子标志来满足任务间依赖性。