为什么我在 openmp 中的每个 运行 中得到不同的输出

Why I am getting different output in every run in openmp

我有N个文本文件。我试图并行读取这些文件,所以我分叉了 N 个线程,每个线程从这 N 个文件中获取一个文本文件(线程 0 获取 file0.txt,线程 1 获取 file1.txt,线程 2 获取 file2.txt 等等)。我要尝试做的下一步是从文件中并行读取前 1000 行,然后放入向量 V1,然后我将对 V1 进行排序。除 V1 外,所有资源对每个胎面都是私有的。但是排序后我每次都会得到不同的输出。我的代码如下。 N的值为395。在此先感谢。

#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <vector>
#include <iostream>
#include <stdlib.h>
#include <stdint.h>
#include <limits.h>
#include <algorithm>
#include <fstream>
#include <string.h>
#include <math.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/io.h>
#include <sys/mman.h>
#include <bits/stdc++.h>
#include <ctype.h>      
#include <algorithm>
#include <omp.h>

typedef struct Data
{

    int  POS;
    char Record[600];
};

using namespace std;

std::vector<Data> sortDataOnDevice(std::vector<Data>,int);

struct comparator_sort
{
    __device__

    bool operator() (const Data &x, const Data& y)
    {
        return x.POS < y.POS;
    }
};

int sortedFilePos;

#pragma omp threadprivate (sortedFilePos)

void externalSorting(void)
{
    string tempString;
    int count = 0;
    stringstream s;
    std::vector<string> result;
    std::vector<Data>   V1;
    std::vector<Data>  finalSorted;
    std::vector<Data> temp1;
    string line;
    string word;
    
    int fp1 = 0;
    omp_set_num_threads(395);
    outfile.open("/ExtrSort/Sorted.txt");
    int firstTrav = 0;
        #pragma omp parallel for private(tempString,line,word,firstTrav) shared(V1)
            for(int i=0;i<395;i++ )
            {
                    int vecIdx = 0;
                    int beg = 0, end = 0;
                    int chrNo;
                    int tid;
                    
                    int fileReadIdx = 0;
                    stringstream intToString;
                    intToString << i;
                    intToString >> tempString;
                    std::string name = "/ExtrSort/Chr_Iteration" + tempString + ".txt";
    

                    cout<<name<<"     "<<omp_get_thread_num()<<endl;
                    
                    std::ifstream sortedFile(name.c_str());

                    if(!sortedFile)
                        cout<<"Fie openning error"<<endl;

                    
                        for(int n=0;n<1000;n++)
                        {

                            int wordCount = 0;

                            int chrFlag = 0;


                            Data *sd = (Data*)malloc(sizeof(Data));
                            
                            getline(sortedFile, line);
                        
                            stringstream s(line);
                            while(s>>word)
                            {
                                wordCount++;
                                if (wordCount == 4)
                                {

                                    strcpy(sd->wholeRecord,line.c_str());
                                    s >> word;
                                    stringstream geek(word);
                                    geek>>sd->POS;
                                    geek.clear();

                                    if(n==999)
                                    {

                                        int fp = sortedFile.tellg();


                                        sortedFilePos = fp;

                                    }

                                    break;
                                }
                            }
                            #pragma omp critical//(fill)
                            {
                                V1.push_back(*sd);
                                free(sd);
                                sd = NULL;
                            }


                        }//for(int n=0;n<1000;n++)
            }
            cout<<file1.size()<<endl;

            finalSorted = sortDataOnDevice(V1,0);

            cout<<finalSorted[0].wholeRecord<<endl;

            V1.clear();

            outfile.close();
        
        
}

std::vector<Data> sortDataOnDevice(std::vector<SAMData> dataX, int composite)
{

    thrust::device_vector<Data> dDataX(dataX.size());
    thrust::copy(dataX.begin(),dataX.end(),dDataX.begin());



    cout<<"Copied"<<endl;

    thrust::stable_sort(dDataX.begin(),dDataX.end(),comparator_sort());

    cout<<"Sorted"<<endl;

    std::vector<Data> sortedDataHost(dDataX.size());
    thrust::copy(dDataX.begin(),dDataX.end(),sortedDataHost.begin());

    dataX.clear();
    dDataX.clear();

    cout<<"After sort"<<endl<<endl<<endl;

    return sortedDataHost;
    
}

我不确定原始代码是什么样的,因为提供的示例中有几十个错误,原因是缺少变量、缺少包含以及对 non-existent 结构字段的引用。这是一个简化版本,其中包含我认为的修复方法。 OpenMP pragmas 似乎基本上是正确的,但是在将记录添加到向量之前,没有检查记录是否实际填充到 while 循环中。由于未初始化数据的行为方式,请注意使用 malloc 虽然在 C++ 中肯定没问题意味着对象是在不一致的未初始化状态下创建的,因此您可能将不同的垃圾数据切片发送到向量中.

以下版本有以下主要区别:

  1. 已为 C++ 正确声明数据(typedef 需要在声明后 命名,C++ 默认提供 typedef)。
  2. POS 在构造 Data 结构时初始化为 -1,此语法需要 C++11 和下面 sd 声明的更改。
  3. 我没有将变量私有化,而是将它们的声明移到了适当的范围内,因此它们自然地限定在每个线程中。这对性能更好,在我看来对可读性更好,但这不是必需的。
  4. sd 在堆栈上声明,而不是作为指向分配缓冲区的指针。如果它 必须 使用 malloc 分配,则必须使用 placement new 对其进行初始化以获得正确的行为。
  5. 为了支持直接使用 std::to_string 整数并在适当的地方直接将值读取为整数,已删除各种字符串流。
  6. 关键的变化,只有在 while 循环后 POS 被设置为新值时,记录才会添加到向量中。

由于这是一个不完整的示例,我不确定这是否会解决问题,但应用这些更改后代码应该更容易理解。

#include <ctype.h>
#include <fstream>
#include <iostream>
#include <limits.h>
#include <math.h>
#include <omp.h>
#include <sstream>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <vector>

struct Data {

  int POS = -1;
  string Record;
};

using namespace std;

std::vector<Data> sortDataOnDevice(std::vector<Data>, int);

struct comparator_sort {
  bool operator()(const Data& x, const Data& y) { return x.POS < y.POS; }
};

int sortedFilePos;

#pragma omp threadprivate(sortedFilePos)

void externalSorting(void)
{
  std::ofstream outfile;
  std::vector<string> result;
  std::vector<Data> V1;
  std::vector<Data> finalSorted;
  std::vector<Data> temp1;

  int fp1 = 0;
  omp_set_num_threads(395);
  outfile.open("/ExtrSort/Sorted.txt");
#pragma omp parallel for shared(V1) default(none)
  for (int i = 0; i < 395; i++) {

    std::string name = "/ExtrSort/Chr_Iteration" + std::to_string(i) + ".txt";

    cout << name << "     " << omp_get_thread_num() << endl;

    std::ifstream sortedFile(name);

    if (!sortedFile) {
      cout << "Fie openning error" << endl;
    }

    for (int n = 0; n < 1000; n++) {
      Data sd {}; // create and initialize on the stack
      int wordCount = 0;
      int chrFlag = 0;
      string line;
      getline(sortedFile, line);

      string word;
      istringstream s(line);
      while (s >> word) {
        wordCount++;
        if (wordCount == 4) {
          sd.Record = line;
          s >> sd.POS;
          if (n == 999) {
            int fp = sortedFile.tellg();
            sortedFilePos = fp;
          }
          break;
        }
      }
      if (POS != -1) { // KEY CHANGE
#pragma omp critical //(fill)
        {
          V1.push_back(sd);
        }
      }
    } // for(int n=0;n<1000;n++)
  }
  /* cout << file1.size() << endl; */

  finalSorted = sortDataOnDevice(V1, 0);

  cout << finalSorted[0].Record << endl;

  V1.clear();

  outfile.close();
}

如果出于某种原因您需要原始结构,唯一需要的更改应该是检查记录是否有效,并将 malloc 替换为 new 或在分配内存之后在使用前使用 placement new 初始化记录。