Boost mpi挂起
Boost mpi hangs on
我有一个简单的异步消息传递程序,如下所示。我 运行 它在两台有 16 个进程的 PC 上。这些进程通过一个 4x4 矩阵的环面连接。因此,在 main 函数的开头,您将看到谁是进程的邻居。我想做的是实现一个检查点算法。因此,当 i = 5 或 10(假设 i 是时间)时,每个进程都会向其所有邻居发送一条消息。当一个进程收到一条消息时,它会为同一个进程启动一个新的 irecv。但是如果 i 是 10 它不会启动 irecv,因为在那之后不会收到任何消息。在程序结束时,所有进程等待未收到的消息。
/* Demo_01_Main.cpp */
#include <boost/mpi.hpp>
#include <boost/serialization/string.hpp>
#include <string>
#include <iostream>
#include <fstream>
#include <map>
using namespace std;
class Packet{
friend class boost::serialization::access;
private:
int receiver;
int sender;
int data;
public:
Packet(){
receiver = 0;
sender = 0;
data = 0;
}
Packet(int receiver, int sender, int data){
this->receiver = receiver;
this->sender = sender;
this->data = data;
}
~Packet(){}
int getData() {
return data;
}
void setData(int data) {
this->data = data;
}
int getReceiver() {
return receiver;
}
void setReceiver(int receiver) {
this->receiver = receiver;
}
int getSender() {
return sender;
}
void setSender(int sender) {
this->sender = sender;
}
template<class Archive>
void serialize(Archive& ar, const unsigned int version) {
ar & receiver;
ar & sender;
ar & data;
}
string toString(){
stringstream ss;
ss << "Packet = [Data: " << data << ", Receiver: " << receiver << ", Sender: " << sender << "]";
return ss.str();
}
};
int rank;
void log(string str){
ofstream outfile;
stringstream logFileName;
logFileName << "log_" << rank << ".txt";
outfile.open(logFileName.str().c_str(), std::ios_base::app);
outfile << str;
outfile.close();
}
int main(int argc, char* argv[]){
map<int, boost::mpi::request> mpiReceiveRequest;
map<int, boost::mpi::request> mpiSendRequest;
map<int, Packet *> receivedData;
vector<int> neighbors;
boost::mpi::environment env(argc, argv);
boost::mpi::communicator world;
rank = world.rank();
if(rank == 0){
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 1){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(14);
}
else if(rank == 2){
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(13);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 3){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(12);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 4){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(11);
}
else if(rank == 5){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(10);
}
else if(rank == 6){
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(9);
neighbors.push_back(10);
neighbors.push_back(11);
}
else if(rank == 7){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(11);
}
else if(rank == 8){
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(9);
neighbors.push_back(11);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 9){
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(14);
}
else if(rank == 10){
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(9);
neighbors.push_back(11);
neighbors.push_back(13);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 11){
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(12);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 12){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(11);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 13){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(10);
neighbors.push_back(12);
neighbors.push_back(14);
}
else if(rank == 14){
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(9);
neighbors.push_back(10);
neighbors.push_back(11);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 15){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(11);
neighbors.push_back(12);
neighbors.push_back(14);
}
for(int i=0; i<8; i++){
Packet * packet = new Packet();
receivedData[neighbors[i]] = packet;
mpiReceiveRequest[neighbors[i]] = world.irecv(neighbors[i], 100, *packet);
}
for(int i=1; i<=10; i++){
if(i%5 == 0){ // Checkpoint time
for(int j=0; j<8; j++){
Packet * p = new Packet(neighbors[j], rank, i);
mpiSendRequest[neighbors[j]] = world.isend(neighbors[j], 100, *p);
log("Sending: ");
log(p->toString());
log("\n");
}
}
for(int j=0; j<8; j++){
if(mpiReceiveRequest[neighbors[j]].test()){
Packet * p = receivedData[neighbors[j]];
log("Received: ");
log(receivedData[neighbors[j]]->toString());
log("\n");
if(p->getData() != 10){
Packet * packet = new Packet();
receivedData[neighbors[j]] = packet;
mpiReceiveRequest[neighbors[j]] = world.irecv(neighbors[j], 100, *packet);
}
}
}
}
for(int i=0; i<8; i++){
stringstream ss;
ss << " Wait from: " << neighbors[i] << endl;
log(ss.str());
mpiReceiveRequest[neighbors[i]].wait();
log("Received: ");
log(receivedData[neighbors[i]]->toString());
log("\n");
}
stringstream ss;
ss << rank << " is done" << endl;
log(ss.str());
return 0;
}
问题是它在等待命令时挂起。同样,一些接收到的消息包含无意义的数据。例如进程7的输出文件如下:
Received: Packet = [Data: 5, Receiver: 7, Sender: 10]
Received: Packet = [Data: 5, Receiver: 7, Sender: 11]
Received: Packet = [Data: 5, Receiver: 7, Sender: 0]
Received: Packet = [Data: 5, Receiver: 7, Sender: 4]
Sending: Packet = [Data: 5, Receiver: 0, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 2, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 3, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 4, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 6, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 8, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 10, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 11, Sender: 7]
Wait from: 0
Received: Packet = [Data: 537985024, Receiver: 0, Sender: 0]
Wait from: 2
我想不出问题出在哪里。
问题出在为之前测试并通过的请求调用等待命令时。其实,我意识到这也是我之前的一个问题的原因:
所以,我修改了我的代码如下,现在它运行了:
/* ...
else if(rank == 15){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(11);
neighbors.push_back(12);
neighbors.push_back(14);
}
...After assigning neighbors
*/
for(int i=0; i<8; i++){
Packet * packet = new Packet();
receivedData[neighbors[i]] = packet;
mpiReceiveRequest[i] = world.irecv(neighbors[i], 100, *packet);
}
vector<int> completed;
for(int i=1; i<=10; i++){
if(i%5 == 0){ // Checkpoint time
for(int j=0; j<8; j++){
Packet * p = new Packet(neighbors[j], rank, i);
mpiSendRequest.push_back(world.isend(neighbors[j], 100, *p));
log("Sending: ");
log(p->toString());
log("\n");
}
}
for(int j=0; j<8; j++){
vector<int>::iterator it = completed.begin();
bool passed = false;
while(it != completed.end()){
if(*it == j){
passed = true;
break;
}
it++;
}
if(!passed){
if(mpiReceiveRequest[j].test()){
completed.push_back(j);
Packet * p = receivedData[neighbors[j]];
if(p->getData() != 10){
Packet * packet = new Packet();
receivedData[neighbors[j]] = packet;
mpiReceiveRequest[j] = world.irecv(neighbors[j], 100, *packet);
completed.pop_back();
}
}
}
}
}
vector<boost::mpi::request> reqs;
for(int i=0; i<8; i++){
vector<int>::iterator it = completed.begin();
bool passed = false;
while(it != completed.end()){
if(*it == i){
passed = true;
break;
}
it++;
}
if(!passed){
mpiReceiveRequest[i].wait();
reqs.push_back(mpiReceiveRequest[i]);
}
}
stringstream ss;
ss << rank << " is done" << endl;
log(ss.str());
return 0;
} // End of main
主要区别是将完成的请求存储在向量中,而不是在最后等待它们。
我有一个简单的异步消息传递程序,如下所示。我 运行 它在两台有 16 个进程的 PC 上。这些进程通过一个 4x4 矩阵的环面连接。因此,在 main 函数的开头,您将看到谁是进程的邻居。我想做的是实现一个检查点算法。因此,当 i = 5 或 10(假设 i 是时间)时,每个进程都会向其所有邻居发送一条消息。当一个进程收到一条消息时,它会为同一个进程启动一个新的 irecv。但是如果 i 是 10 它不会启动 irecv,因为在那之后不会收到任何消息。在程序结束时,所有进程等待未收到的消息。
/* Demo_01_Main.cpp */
#include <boost/mpi.hpp>
#include <boost/serialization/string.hpp>
#include <string>
#include <iostream>
#include <fstream>
#include <map>
using namespace std;
class Packet{
friend class boost::serialization::access;
private:
int receiver;
int sender;
int data;
public:
Packet(){
receiver = 0;
sender = 0;
data = 0;
}
Packet(int receiver, int sender, int data){
this->receiver = receiver;
this->sender = sender;
this->data = data;
}
~Packet(){}
int getData() {
return data;
}
void setData(int data) {
this->data = data;
}
int getReceiver() {
return receiver;
}
void setReceiver(int receiver) {
this->receiver = receiver;
}
int getSender() {
return sender;
}
void setSender(int sender) {
this->sender = sender;
}
template<class Archive>
void serialize(Archive& ar, const unsigned int version) {
ar & receiver;
ar & sender;
ar & data;
}
string toString(){
stringstream ss;
ss << "Packet = [Data: " << data << ", Receiver: " << receiver << ", Sender: " << sender << "]";
return ss.str();
}
};
int rank;
void log(string str){
ofstream outfile;
stringstream logFileName;
logFileName << "log_" << rank << ".txt";
outfile.open(logFileName.str().c_str(), std::ios_base::app);
outfile << str;
outfile.close();
}
int main(int argc, char* argv[]){
map<int, boost::mpi::request> mpiReceiveRequest;
map<int, boost::mpi::request> mpiSendRequest;
map<int, Packet *> receivedData;
vector<int> neighbors;
boost::mpi::environment env(argc, argv);
boost::mpi::communicator world;
rank = world.rank();
if(rank == 0){
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 1){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(14);
}
else if(rank == 2){
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(13);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 3){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(12);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 4){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(11);
}
else if(rank == 5){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(10);
}
else if(rank == 6){
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(9);
neighbors.push_back(10);
neighbors.push_back(11);
}
else if(rank == 7){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(11);
}
else if(rank == 8){
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(7);
neighbors.push_back(9);
neighbors.push_back(11);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 9){
neighbors.push_back(4);
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(12);
neighbors.push_back(13);
neighbors.push_back(14);
}
else if(rank == 10){
neighbors.push_back(5);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(9);
neighbors.push_back(11);
neighbors.push_back(13);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 11){
neighbors.push_back(4);
neighbors.push_back(6);
neighbors.push_back(7);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(12);
neighbors.push_back(14);
neighbors.push_back(15);
}
else if(rank == 12){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(3);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(11);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 13){
neighbors.push_back(0);
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(8);
neighbors.push_back(9);
neighbors.push_back(10);
neighbors.push_back(12);
neighbors.push_back(14);
}
else if(rank == 14){
neighbors.push_back(1);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(9);
neighbors.push_back(10);
neighbors.push_back(11);
neighbors.push_back(13);
neighbors.push_back(15);
}
else if(rank == 15){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(11);
neighbors.push_back(12);
neighbors.push_back(14);
}
for(int i=0; i<8; i++){
Packet * packet = new Packet();
receivedData[neighbors[i]] = packet;
mpiReceiveRequest[neighbors[i]] = world.irecv(neighbors[i], 100, *packet);
}
for(int i=1; i<=10; i++){
if(i%5 == 0){ // Checkpoint time
for(int j=0; j<8; j++){
Packet * p = new Packet(neighbors[j], rank, i);
mpiSendRequest[neighbors[j]] = world.isend(neighbors[j], 100, *p);
log("Sending: ");
log(p->toString());
log("\n");
}
}
for(int j=0; j<8; j++){
if(mpiReceiveRequest[neighbors[j]].test()){
Packet * p = receivedData[neighbors[j]];
log("Received: ");
log(receivedData[neighbors[j]]->toString());
log("\n");
if(p->getData() != 10){
Packet * packet = new Packet();
receivedData[neighbors[j]] = packet;
mpiReceiveRequest[neighbors[j]] = world.irecv(neighbors[j], 100, *packet);
}
}
}
}
for(int i=0; i<8; i++){
stringstream ss;
ss << " Wait from: " << neighbors[i] << endl;
log(ss.str());
mpiReceiveRequest[neighbors[i]].wait();
log("Received: ");
log(receivedData[neighbors[i]]->toString());
log("\n");
}
stringstream ss;
ss << rank << " is done" << endl;
log(ss.str());
return 0;
}
问题是它在等待命令时挂起。同样,一些接收到的消息包含无意义的数据。例如进程7的输出文件如下:
Received: Packet = [Data: 5, Receiver: 7, Sender: 10]
Received: Packet = [Data: 5, Receiver: 7, Sender: 11]
Received: Packet = [Data: 5, Receiver: 7, Sender: 0]
Received: Packet = [Data: 5, Receiver: 7, Sender: 4]
Sending: Packet = [Data: 5, Receiver: 0, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 2, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 3, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 4, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 6, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 8, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 10, Sender: 7]
Sending: Packet = [Data: 5, Receiver: 11, Sender: 7]
Wait from: 0
Received: Packet = [Data: 537985024, Receiver: 0, Sender: 0]
Wait from: 2
我想不出问题出在哪里。
问题出在为之前测试并通过的请求调用等待命令时。其实,我意识到这也是我之前的一个问题的原因:
所以,我修改了我的代码如下,现在它运行了:
/* ...
else if(rank == 15){
neighbors.push_back(0);
neighbors.push_back(2);
neighbors.push_back(3);
neighbors.push_back(8);
neighbors.push_back(10);
neighbors.push_back(11);
neighbors.push_back(12);
neighbors.push_back(14);
}
...After assigning neighbors
*/
for(int i=0; i<8; i++){
Packet * packet = new Packet();
receivedData[neighbors[i]] = packet;
mpiReceiveRequest[i] = world.irecv(neighbors[i], 100, *packet);
}
vector<int> completed;
for(int i=1; i<=10; i++){
if(i%5 == 0){ // Checkpoint time
for(int j=0; j<8; j++){
Packet * p = new Packet(neighbors[j], rank, i);
mpiSendRequest.push_back(world.isend(neighbors[j], 100, *p));
log("Sending: ");
log(p->toString());
log("\n");
}
}
for(int j=0; j<8; j++){
vector<int>::iterator it = completed.begin();
bool passed = false;
while(it != completed.end()){
if(*it == j){
passed = true;
break;
}
it++;
}
if(!passed){
if(mpiReceiveRequest[j].test()){
completed.push_back(j);
Packet * p = receivedData[neighbors[j]];
if(p->getData() != 10){
Packet * packet = new Packet();
receivedData[neighbors[j]] = packet;
mpiReceiveRequest[j] = world.irecv(neighbors[j], 100, *packet);
completed.pop_back();
}
}
}
}
}
vector<boost::mpi::request> reqs;
for(int i=0; i<8; i++){
vector<int>::iterator it = completed.begin();
bool passed = false;
while(it != completed.end()){
if(*it == i){
passed = true;
break;
}
it++;
}
if(!passed){
mpiReceiveRequest[i].wait();
reqs.push_back(mpiReceiveRequest[i]);
}
}
stringstream ss;
ss << rank << " is done" << endl;
log(ss.str());
return 0;
} // End of main
主要区别是将完成的请求存储在向量中,而不是在最后等待它们。