并行计算积分
Parallizing the calculation of an integral
这里我有一段代码是一个计算函数积分的函数。在代码中将 function() 定义为要集成的函数。
我正在学习并行编程,我需要并行编写这段代码。原始程序是顺序的,因为每次迭代都会向另一个处理器完成发送操作。为了使其并行,我想要实现的是,每个循环迭代 3 个发送操作都执行到其他 3 个可用处理器。假设有 1 个处理器负责分配任务(等级 = 0),另外 3 个处理器负责实际计算。
注意这是一大段代码,但我还添加了注释以使其更加清晰:
序号:
if (myRank == 0)
{
// I am the controller, distribute the work
for (step = 0; step < maxSteps; step++)
{
x[0] = x_start + stepSize*step;
x[1] = x_start + stepSize*(step+1);
nextRank = step % (numProcs-1) + 1;
// Send the work
MPI_Send(x, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD);
// Receive the result
MPI_Recv(y, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y[0]+y[1]);
}
// Signal workers to stop by sending empty messages with tag TAG_END
for (nextRank = 1; nextRank < numProcs; nextRank++)
MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
}
else
{
while (1)
{
// I am a worker, wait for work
// Receive the left and right points of the trapezoid and compute
// the corresponding function values. If the tag is TAG_END, don't
// compute but exit.
MPI_Recv(x, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD,
&status);
if (status.MPI_TAG == TAG_END) break;
y[0] = f(x[0]);
y[1] = f(x[1]);
// Send back the computed result
MPI_Send(y, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
return sum;
}
为了并行化它,我真的对它进行了硬编码,以明确我的工作。我用 3 步使循环递增。我添加了新数组来存储 x 和 y 值。我所做的是首先收集特定数组中的 x 值。然后我将每个 x 值数组发送到一个新处理器。然后我执行另一个函数来获取 y 值。然后我将它们发送回处理器 (rank = 0) 以添加所有 'integration slices'.
尝试并行化的代码
if (myRank == 0)
{
// I am the controller, distribute the work
for (step = 0; step < maxSteps; step+3)
{
x1[0] = x_start + stepSize*step;
x1[1] = x_start + stepSize*(step+1);
x2[0] = x_start + stepSize*(step+1);
x2[1] = x_start + stepSize*((step+1)+1);
x3[0] = x_start + stepSize*(step+2);
x3[1] = x_start + stepSize*((step+1)+2);
nextRank = step % (numProcs-1) + 1;
// Send the work
MPI_Send(x1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD);
MPI_Send(x2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD);
MPI_Send(x3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD);
// Receive the result
MPI_Recv(y1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y1[0]+y1[1]);
MPI_Recv(y2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y2[0]+y2[1]);
MPI_Recv(y3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y3[0]+y3[1]);
}
// Signal workers to stop by sending empty messages with tag TAG_END
for (nextRank = 1; nextRank < numProcs; nextRank++)
MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
}
else if (myRank = 1)
{
while (1)
{
MPI_Recv(x1, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_END) break;
y1[0] = func(x1[0]);
y1[1] = func(x1[1]);
// Send back the computed result
MPI_Send(y1, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
else if (myRank = 2)
{
while (1)
{
MPI_Recv(x2, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_END) break;
y2[0] = func(x2[0]);
y2[1] = func(x2[1]);
// Send back the computed result
MPI_Send(y2, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
else if (myRank = 3)
{
while (1)
{
MPI_Recv(x3, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_END) break;
y3[0] = func(x3[0]);
y3[1] = func(x3[1]);
// Send back the computed result
MPI_Send(y3, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
return sum;
}
问题是我不再获得输出。恐怕我造成了僵局,但我无法发现在哪里。我可以获得有关此方法的反馈吗?
如果你想从拥有8个核心中获利(这只是一个例子)你能做的最好的(也是最简单的)就是把你的积分区间分成八个部分(你可以任意分区,给每个相同数量的工作,这取决于你)然后独立计算每个线程中的每个积分(使用与一个线程相同的循环)。
这种方式不会改变你原来的计算方式,并且计算之间完全独立(所以完全不存在资源争用)
最后你只需要将八个积分相加就可以得到你想要的结果
如果您正在考虑诸如展开循环以实现更多并行性的事情,那么您最好相信您的编译器,它能够并行使用他的优化器来获得超过 32 个寄存器的收益 cpu 有今天,你很可能不会做得更好。
此处建议的方法将您的积分转换为 8 种不同的积分计算,每种计算具有不同的参数和不同的值,并且一个线程中的演算不依赖于其他线程中的演算,因此,即使在基于线程内核的情况下在管道上,您永远不必重新排序或复杂化指令,因为很容易将另一个线程的指令添加到管道中以避免产生气泡。如果你有8个核心,实际上超过8个线程计算一些东西并不代表任何有利的任务。
这里我有一段代码是一个计算函数积分的函数。在代码中将 function() 定义为要集成的函数。
我正在学习并行编程,我需要并行编写这段代码。原始程序是顺序的,因为每次迭代都会向另一个处理器完成发送操作。为了使其并行,我想要实现的是,每个循环迭代 3 个发送操作都执行到其他 3 个可用处理器。假设有 1 个处理器负责分配任务(等级 = 0),另外 3 个处理器负责实际计算。
注意这是一大段代码,但我还添加了注释以使其更加清晰:
序号:
if (myRank == 0)
{
// I am the controller, distribute the work
for (step = 0; step < maxSteps; step++)
{
x[0] = x_start + stepSize*step;
x[1] = x_start + stepSize*(step+1);
nextRank = step % (numProcs-1) + 1;
// Send the work
MPI_Send(x, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD);
// Receive the result
MPI_Recv(y, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y[0]+y[1]);
}
// Signal workers to stop by sending empty messages with tag TAG_END
for (nextRank = 1; nextRank < numProcs; nextRank++)
MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
}
else
{
while (1)
{
// I am a worker, wait for work
// Receive the left and right points of the trapezoid and compute
// the corresponding function values. If the tag is TAG_END, don't
// compute but exit.
MPI_Recv(x, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD,
&status);
if (status.MPI_TAG == TAG_END) break;
y[0] = f(x[0]);
y[1] = f(x[1]);
// Send back the computed result
MPI_Send(y, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
return sum;
}
为了并行化它,我真的对它进行了硬编码,以明确我的工作。我用 3 步使循环递增。我添加了新数组来存储 x 和 y 值。我所做的是首先收集特定数组中的 x 值。然后我将每个 x 值数组发送到一个新处理器。然后我执行另一个函数来获取 y 值。然后我将它们发送回处理器 (rank = 0) 以添加所有 'integration slices'.
尝试并行化的代码
if (myRank == 0)
{
// I am the controller, distribute the work
for (step = 0; step < maxSteps; step+3)
{
x1[0] = x_start + stepSize*step;
x1[1] = x_start + stepSize*(step+1);
x2[0] = x_start + stepSize*(step+1);
x2[1] = x_start + stepSize*((step+1)+1);
x3[0] = x_start + stepSize*(step+2);
x3[1] = x_start + stepSize*((step+1)+2);
nextRank = step % (numProcs-1) + 1;
// Send the work
MPI_Send(x1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD);
MPI_Send(x2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD);
MPI_Send(x3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD);
// Receive the result
MPI_Recv(y1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y1[0]+y1[1]);
MPI_Recv(y2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y2[0]+y2[1]);
MPI_Recv(y3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sum += stepSize*0.5*(y3[0]+y3[1]);
}
// Signal workers to stop by sending empty messages with tag TAG_END
for (nextRank = 1; nextRank < numProcs; nextRank++)
MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
}
else if (myRank = 1)
{
while (1)
{
MPI_Recv(x1, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_END) break;
y1[0] = func(x1[0]);
y1[1] = func(x1[1]);
// Send back the computed result
MPI_Send(y1, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
else if (myRank = 2)
{
while (1)
{
MPI_Recv(x2, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_END) break;
y2[0] = func(x2[0]);
y2[1] = func(x2[1]);
// Send back the computed result
MPI_Send(y2, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
else if (myRank = 3)
{
while (1)
{
MPI_Recv(x3, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_END) break;
y3[0] = func(x3[0]);
y3[1] = func(x3[1]);
// Send back the computed result
MPI_Send(y3, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
}
}
return sum;
}
问题是我不再获得输出。恐怕我造成了僵局,但我无法发现在哪里。我可以获得有关此方法的反馈吗?
如果你想从拥有8个核心中获利(这只是一个例子)你能做的最好的(也是最简单的)就是把你的积分区间分成八个部分(你可以任意分区,给每个相同数量的工作,这取决于你)然后独立计算每个线程中的每个积分(使用与一个线程相同的循环)。
这种方式不会改变你原来的计算方式,并且计算之间完全独立(所以完全不存在资源争用)
最后你只需要将八个积分相加就可以得到你想要的结果
如果您正在考虑诸如展开循环以实现更多并行性的事情,那么您最好相信您的编译器,它能够并行使用他的优化器来获得超过 32 个寄存器的收益 cpu 有今天,你很可能不会做得更好。
此处建议的方法将您的积分转换为 8 种不同的积分计算,每种计算具有不同的参数和不同的值,并且一个线程中的演算不依赖于其他线程中的演算,因此,即使在基于线程内核的情况下在管道上,您永远不必重新排序或复杂化指令,因为很容易将另一个线程的指令添加到管道中以避免产生气泡。如果你有8个核心,实际上超过8个线程计算一些东西并不代表任何有利的任务。