并行计算积分

Parallizing the calculation of an integral

这里我有一段代码是一个计算函数积分的函数。在代码中将 function() 定义为要集成的函数。

我正在学习并行编程,我需要并行编写这段代码。原始程序是顺序的,因为每次迭代都会向另一个处理器完成发送操作。为了使其并行,我想要实现的是,每个循环迭代 3 个发送操作都执行到其他 3 个可用处理器。假设有 1 个处理器负责分配任务(等级 = 0),另外 3 个处理器负责实际计算。

注意这是一大段代码,但我还添加了注释以使其更加清晰:

序号:

    if (myRank == 0)
    {
        // I am the controller, distribute the work
        for (step = 0; step < maxSteps; step++)
        {
            x[0] = x_start + stepSize*step;
            x[1] = x_start + stepSize*(step+1);
            nextRank = step % (numProcs-1) + 1;
            // Send the work
            MPI_Send(x, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD);
            // Receive the result
            MPI_Recv(y, 2, MPI_DOUBLE, nextRank, TAG_WORK, MPI_COMM_WORLD,
                MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y[0]+y[1]);
        }
        // Signal workers to stop by sending empty messages with tag TAG_END
        for (nextRank = 1; nextRank < numProcs; nextRank++)
            MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
    }
    else
    {
        while (1)
        {
            // I am a worker, wait for work

            // Receive the left and right points of the trapezoid and compute
            // the corresponding function values. If the tag is TAG_END, don't
            // compute but exit.
            MPI_Recv(x, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD,
                &status);
            if (status.MPI_TAG == TAG_END) break;
            y[0] = f(x[0]);
            y[1] = f(x[1]);
            // Send back the computed result
            MPI_Send(y, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    return sum;
}

为了并行化它,我真的对它进行了硬编码,以明确我的工作。我用 3 步使循环递增。我添加了新数组来存储 x 和 y 值。我所做的是首先收集特定数组中的 x 值。然后我将每个 x 值数组发送到一个新处理器。然后我执行另一个函数来获取 y 值。然后我将它们发送回处理器 (rank = 0) 以添加所有 'integration slices'.

尝试并行化的代码

 if (myRank == 0)
    {
        // I am the controller, distribute the work
        for (step = 0; step < maxSteps; step+3)
        {
            x1[0] = x_start + stepSize*step;
            x1[1] = x_start + stepSize*(step+1);
            x2[0] = x_start + stepSize*(step+1);
            x2[1] = x_start + stepSize*((step+1)+1);
            x3[0] = x_start + stepSize*(step+2);
            x3[1] = x_start + stepSize*((step+1)+2);
            nextRank = step % (numProcs-1) + 1;
            // Send the work
            MPI_Send(x1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD);
            MPI_Send(x2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD);
            MPI_Send(x3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD);
            // Receive the result
            MPI_Recv(y1, 2, MPI_DOUBLE, 1, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y1[0]+y1[1]);
            MPI_Recv(y2, 2, MPI_DOUBLE, 2, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y2[0]+y2[1]);
            MPI_Recv(y3, 2, MPI_DOUBLE, 3, TAG_WORK, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            sum += stepSize*0.5*(y3[0]+y3[1]);
        }
        // Signal workers to stop by sending empty messages with tag TAG_END
        for (nextRank = 1; nextRank < numProcs; nextRank++)
            MPI_Send(&nextRank, 0, MPI_INT, nextRank, TAG_END, MPI_COMM_WORLD);
    }
    else if (myRank = 1)
    {
        while (1)
        {
            MPI_Recv(x1, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
            if (status.MPI_TAG == TAG_END) break;
            y1[0] = func(x1[0]);
            y1[1] = func(x1[1]);
            // Send back the computed result
            MPI_Send(y1, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    
    else if (myRank = 2)
    {
        while (1)
        {
            MPI_Recv(x2, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
            if (status.MPI_TAG == TAG_END) break;
            y2[0] = func(x2[0]);
            y2[1] = func(x2[1]);
            // Send back the computed result
            MPI_Send(y2, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    
    else if (myRank = 3)
    {
        while (1)
        {
            MPI_Recv(x3, 2, MPI_DOUBLE, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
            if (status.MPI_TAG == TAG_END) break;
            y3[0] = func(x3[0]);
            y3[1] = func(x3[1]);
            // Send back the computed result
            MPI_Send(y3, 2, MPI_DOUBLE, 0, TAG_WORK, MPI_COMM_WORLD);
        }
    }
    return sum;
}

问题是我不再获得输出。恐怕我造成了僵局,但我无法发现在哪里。我可以获得有关此方法的反馈吗?

来源:https://doc.itc.rwth-aachen.de/display/VE/PPCES+2012

如果你想从拥有8个核心中获利(这只是一个例子)你能做的最好的(也是最简单的)就是把你的积分区间分成八个部分(你可以任意分区,给每个相同数量的工作,这取决于你)然后独立计算每个线程中的每个积分(使用与一个线程相同的循环)。

这种方式不会改变你原来的计算方式,并且计算之间完全独立(所以完全不存在资源争用)

最后你只需要将八个积分相加就可以得到你想要的结果

如果您正在考虑诸如展开循环以实现更多并行性的事情,那么您最好相信您的编译器,它能够并行使用他的优化器来获得超过 32 个寄存器的收益 cpu 有今天,你很可能不会做得更好。

此处建议的方法将您的积分转换为 8 种不同的积分计算,每种计算具有不同的参数和不同的值,并且一个线程中的演算不依赖于其他线程中的演算,因此,即使在基于线程内核的情况下在管道上,您永远不必重新排序或复杂化指令,因为很容易将另一个线程的指令添加到管道中以避免产生气泡。如果你有8个核心,实际上超过8个线程计算一些东西并不代表任何有利的任务。