OpenMP:写入与HDF5同步的数据
OpenMP: writing data synchronized with HDF5
我目前正在进行一个项目,其中必须使用 HDF5 创建一个大型数据集。现在,天真的实现非常好,但非常慢。慢的部分是计算(比写慢 10 倍),我不能再加速了,但也许并行化是可能的。
我想我可以使用一个简单的 #pragma omp parallel for 但是 dataspace.write(..) 方法应该出于速度原因而连续(也许无关紧要)。例如看这张照片。
需要注意的是,由于维度的原因,写入函数使用了与缓冲区大小相同的分块布局(实际上约为 1Mb)
/*
------------NAIVE IMPLEMENTATION-----------------
|T:<calc0><W0><calc1><W1><calc2><W2>............|
|-----------------------------------------------|
|----------PARALLEL IMPLEMENTATION--------------|
|-----------------------------------------------|
|T0:<calc0----><W0><calc4>.....<W4>.............|
|T1:<calc1---->....<W1><calc5->....<W5>.........|
|T2:<calc2--->.........<W2>calc6-->....<W6>.....|
|T3:<calc3----->...........<W3><calc7-->...<W7>.|
------------DIFFERENT IMPLEMENTATION-------------
i.e.: Queuesize=4
T0:.......<W0><W1><W2><W3><W4><W5><W6>..........|
T1:<calc0><calc3>.....<calc6>...................|
T2:<calc1>....<calc4>.....<calc7>...............|
T3:<calc2>........<calc5>.....<calc8>...........|
T Thread
<calcn---> Calculation time
<Wn> Write data n. Order *important*
. Waiting
*/
代码示例:
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
double calculate(float *buf, const struct options *opts) {
// dummy function just to get a time reference
double res = 0;
for (size_t i = 0; i < 10000; i++)
res += std::sin(i);
return 1 / (1 + res);
}
struct options {
size_t idx[6];
};
class Dataspace {
public:
void selectHyperslab(){}; // selects region in disk space
void write(float *buf){}; // write buf to selected disk space
};
int main() {
size_t N = 6;
size_t dims[6] = {4 * N, 4 * N, 4 * N, 4 * N, 4 * N, 4 * N},
buf_offs[6] = {4, 4, 4, 4, 4, 4};
// dims: size of each dimension, multiple of 4
// buf_offs: size of buffer in each dimension
// Calcuate buffer size and allocate
// the size of the buffer is usually around 1Mb
// and not a float but a compund datatype
size_t buf_size = buf_offs[0];
for (auto off : buf_offs)
buf_size *= off;
std::unique_ptr<float[]> buf{new float[buf_size]};
struct options opts; // options parameters, passed to calculation fun
struct Dataspace dataspace; // dummy Dataspace. Supplied by HDF5
size_t i = 0;
size_t idx0, idx1, idx2, idx3, idx4, idx5;
auto t_start = std::chrono::high_resolution_clock::now();
std::cout << "[START]" << std::endl;
for (idx0 = 0; idx0 < dims[0]; idx0 += buf_offs[0])
for (idx1 = 0; idx1 < dims[1]; idx1 += buf_offs[1])
for (idx2 = 0; idx2 < dims[2]; idx2 += buf_offs[2])
for (idx3 = 0; idx3 < dims[3]; idx3 += buf_offs[3])
for (idx4 = 0; idx4 < dims[4]; idx4 += buf_offs[4])
for (idx5 = 0; idx5 < dims[5]; idx5 += buf_offs[5]) {
i++;
opts.idx[0] = idx0;
opts.idx[1] = idx1;
opts.idx[2] = idx2;
opts.idx[3] = idx3;
opts.idx[4] = idx4;
opts.idx[5] = idx5;
dataspace.selectHyperslab(/**/); // function from HDF5
calculate(buf.get(), &opts); // populate buf with data
dataspace.write(buf.get()); // has to be sequential
}
std::cout << "[DONE] " << i << " calls" << std::endl;
std::chrono::duration<double> diff =
std::chrono::high_resolution_clock::now() - t_start;
std::cout << "Time: " << diff.count() << std::endl;
return 0;
}
代码应该开箱即用。
我已经快速了解了 OpenMP,但我还不能全神贯注。谁能给我一个 hint/working 的例子?我不擅长并行化,但是带有缓冲区队列的编写器线程不能工作吗?还是使用 OpenMP 过度杀伤力而 pthreads 就足够了?
非常感谢任何帮助,
干杯
您的第一个并行实现想法是迄今为止最简单的实现。创建一个队列和一个专用的 I/O 线程可能会表现更好,但使用 OpenMP 实施起来要困难得多。
下面是并行版本的一个简单示例。最重要的方面是:
- 共享数据:确保线程之间共享的任何数据都没有竞争条件。例如,每个线程都必须有自己的
buf
和 opts
,因为它们显然是并行修改的,没有任何限制。最简单的方法是在并行区域内本地定义变量。循环 idxn
,至少对于内部循环,并且 i
必须在本地定义。您不能像以前那样计算 i
- 这会在每个循环迭代之间创建依赖关系并阻止并行化。
- 将
pragma omp for
工作共享应用于循环。由于每个维度的迭代量较小,建议应用collapse
。这将分配多个嵌套循环的工作。 collapse
的最佳值将为您的程序可用的线程数公开足够的并行工作,但不会产生太多开销或阻碍内部循环的单线程优化。您可能想尝试不同的值。
- 使用
critical
部分保护写入数据。一次只有一个线程会进入该部分。这很可能是正确性所必需的(取决于它在 hdf5 中的实现方式)。显然 selectHyperslab
将控制 write
的运行方式,因此它 必须 位于同一临界区。
放在一起,它可能看起来像这样:
#pragma omp parallel
{
// define EVERYTHING that is modified locally to each thread!
std::unique_ptr<float[]> buf{new float[buf_size]};
struct options opts;
// Try different values for collapse if performance is not satisfactory
#pragma omp for collapse(3)
for (size_t idx0 = 0; idx0 < dims[0]; idx0 += buf_offs[0])
for (size_t idx1 = 0; idx1 < dims[1]; idx1 += buf_offs[1])
for (size_t idx2 = 0; idx2 < dims[2]; idx2 += buf_offs[2])
for (size_t idx3 = 0; idx3 < dims[3]; idx3 += buf_offs[3])
for (size_t idx4 = 0; idx4 < dims[4]; idx4 += buf_offs[4])
for (size_t idx5 = 0; idx5 < dims[5]; idx5 += buf_offs[5]) {
size_t i = idx5 + idx4 * dims[5] + ...;
opts.idx[0] = idx0;
opts.idx[1] = idx1;
opts.idx[2] = idx2;
opts.idx[3] = idx3;
opts.idx[4] = idx4;
opts.idx[5] = idx5;
calculate(buf.get(), &opts); // populate buf with data
#pragma omp critical
{
// I do assume that this function selects where/how data
// will be written so you *must* protected it
// Only one thread can do this at a time.
dataspace.selectHyperslab(/**/); // function from HDF5
dataspace.write(buf.get()); // has to be sequential
}
}
}
我目前正在进行一个项目,其中必须使用 HDF5 创建一个大型数据集。现在,天真的实现非常好,但非常慢。慢的部分是计算(比写慢 10 倍),我不能再加速了,但也许并行化是可能的。
我想我可以使用一个简单的 #pragma omp parallel for 但是 dataspace.write(..) 方法应该出于速度原因而连续(也许无关紧要)。例如看这张照片。
需要注意的是,由于维度的原因,写入函数使用了与缓冲区大小相同的分块布局(实际上约为 1Mb)
/*
------------NAIVE IMPLEMENTATION-----------------
|T:<calc0><W0><calc1><W1><calc2><W2>............|
|-----------------------------------------------|
|----------PARALLEL IMPLEMENTATION--------------|
|-----------------------------------------------|
|T0:<calc0----><W0><calc4>.....<W4>.............|
|T1:<calc1---->....<W1><calc5->....<W5>.........|
|T2:<calc2--->.........<W2>calc6-->....<W6>.....|
|T3:<calc3----->...........<W3><calc7-->...<W7>.|
------------DIFFERENT IMPLEMENTATION-------------
i.e.: Queuesize=4
T0:.......<W0><W1><W2><W3><W4><W5><W6>..........|
T1:<calc0><calc3>.....<calc6>...................|
T2:<calc1>....<calc4>.....<calc7>...............|
T3:<calc2>........<calc5>.....<calc8>...........|
T Thread
<calcn---> Calculation time
<Wn> Write data n. Order *important*
. Waiting
*/
代码示例:
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
double calculate(float *buf, const struct options *opts) {
// dummy function just to get a time reference
double res = 0;
for (size_t i = 0; i < 10000; i++)
res += std::sin(i);
return 1 / (1 + res);
}
struct options {
size_t idx[6];
};
class Dataspace {
public:
void selectHyperslab(){}; // selects region in disk space
void write(float *buf){}; // write buf to selected disk space
};
int main() {
size_t N = 6;
size_t dims[6] = {4 * N, 4 * N, 4 * N, 4 * N, 4 * N, 4 * N},
buf_offs[6] = {4, 4, 4, 4, 4, 4};
// dims: size of each dimension, multiple of 4
// buf_offs: size of buffer in each dimension
// Calcuate buffer size and allocate
// the size of the buffer is usually around 1Mb
// and not a float but a compund datatype
size_t buf_size = buf_offs[0];
for (auto off : buf_offs)
buf_size *= off;
std::unique_ptr<float[]> buf{new float[buf_size]};
struct options opts; // options parameters, passed to calculation fun
struct Dataspace dataspace; // dummy Dataspace. Supplied by HDF5
size_t i = 0;
size_t idx0, idx1, idx2, idx3, idx4, idx5;
auto t_start = std::chrono::high_resolution_clock::now();
std::cout << "[START]" << std::endl;
for (idx0 = 0; idx0 < dims[0]; idx0 += buf_offs[0])
for (idx1 = 0; idx1 < dims[1]; idx1 += buf_offs[1])
for (idx2 = 0; idx2 < dims[2]; idx2 += buf_offs[2])
for (idx3 = 0; idx3 < dims[3]; idx3 += buf_offs[3])
for (idx4 = 0; idx4 < dims[4]; idx4 += buf_offs[4])
for (idx5 = 0; idx5 < dims[5]; idx5 += buf_offs[5]) {
i++;
opts.idx[0] = idx0;
opts.idx[1] = idx1;
opts.idx[2] = idx2;
opts.idx[3] = idx3;
opts.idx[4] = idx4;
opts.idx[5] = idx5;
dataspace.selectHyperslab(/**/); // function from HDF5
calculate(buf.get(), &opts); // populate buf with data
dataspace.write(buf.get()); // has to be sequential
}
std::cout << "[DONE] " << i << " calls" << std::endl;
std::chrono::duration<double> diff =
std::chrono::high_resolution_clock::now() - t_start;
std::cout << "Time: " << diff.count() << std::endl;
return 0;
}
代码应该开箱即用。
我已经快速了解了 OpenMP,但我还不能全神贯注。谁能给我一个 hint/working 的例子?我不擅长并行化,但是带有缓冲区队列的编写器线程不能工作吗?还是使用 OpenMP 过度杀伤力而 pthreads 就足够了? 非常感谢任何帮助,
干杯
您的第一个并行实现想法是迄今为止最简单的实现。创建一个队列和一个专用的 I/O 线程可能会表现更好,但使用 OpenMP 实施起来要困难得多。
下面是并行版本的一个简单示例。最重要的方面是:
- 共享数据:确保线程之间共享的任何数据都没有竞争条件。例如,每个线程都必须有自己的
buf
和opts
,因为它们显然是并行修改的,没有任何限制。最简单的方法是在并行区域内本地定义变量。循环idxn
,至少对于内部循环,并且i
必须在本地定义。您不能像以前那样计算i
- 这会在每个循环迭代之间创建依赖关系并阻止并行化。 - 将
pragma omp for
工作共享应用于循环。由于每个维度的迭代量较小,建议应用collapse
。这将分配多个嵌套循环的工作。collapse
的最佳值将为您的程序可用的线程数公开足够的并行工作,但不会产生太多开销或阻碍内部循环的单线程优化。您可能想尝试不同的值。 - 使用
critical
部分保护写入数据。一次只有一个线程会进入该部分。这很可能是正确性所必需的(取决于它在 hdf5 中的实现方式)。显然selectHyperslab
将控制write
的运行方式,因此它 必须 位于同一临界区。
放在一起,它可能看起来像这样:
#pragma omp parallel
{
// define EVERYTHING that is modified locally to each thread!
std::unique_ptr<float[]> buf{new float[buf_size]};
struct options opts;
// Try different values for collapse if performance is not satisfactory
#pragma omp for collapse(3)
for (size_t idx0 = 0; idx0 < dims[0]; idx0 += buf_offs[0])
for (size_t idx1 = 0; idx1 < dims[1]; idx1 += buf_offs[1])
for (size_t idx2 = 0; idx2 < dims[2]; idx2 += buf_offs[2])
for (size_t idx3 = 0; idx3 < dims[3]; idx3 += buf_offs[3])
for (size_t idx4 = 0; idx4 < dims[4]; idx4 += buf_offs[4])
for (size_t idx5 = 0; idx5 < dims[5]; idx5 += buf_offs[5]) {
size_t i = idx5 + idx4 * dims[5] + ...;
opts.idx[0] = idx0;
opts.idx[1] = idx1;
opts.idx[2] = idx2;
opts.idx[3] = idx3;
opts.idx[4] = idx4;
opts.idx[5] = idx5;
calculate(buf.get(), &opts); // populate buf with data
#pragma omp critical
{
// I do assume that this function selects where/how data
// will be written so you *must* protected it
// Only one thread can do this at a time.
dataspace.selectHyperslab(/**/); // function from HDF5
dataspace.write(buf.get()); // has to be sequential
}
}
}