分段错误多线程 C++ 11

Segmentation fault multithreading C++ 11

简介

我有一个包含 44 百万个名字的向量 entities。我想把它分成 4 部分并并行处理每个部分。 Class Freebase 包含函数 loadData() 用于拆分向量并调用函数 multiThread 以进行处理。


threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));

问题

cout <<"Entity " << entities[i] << endl; 显示以下结果:

最后的 2 个输出是错误的。输出应该是:

当输入被发送到函数 returnValues 时,这会导致分段错误。我该如何解决?


源代码

#ifndef FREEBASE_H
#define FREEBASE_H

class Freebase
{
 public:
    Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
    void loadData();
 private:
   std::string _serverURL;
   std::string _entities;
   std::string _xmlFile;
   void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
   //private data members
   std::vector<std::string> entities;
};

#endif

#include "Freebase.h"
#include "queries/SparqlQuery.h"

Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
  entities = loadEntities();
}

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     data.push_back(make_pair(entities[i], desc));
  }
}


void Freebase::loadData()
{
  vector<pair<string, string>> data;
  vector<thread> threads;
  int Size = entities.size();
  //split database into 4 parts
  int p = 4;
  int right = round((double)Size / (double)p);
  int left = Size % p;
  float totalduration = 0;
  
  vector<pair<int, int>> coordinates;
  int counter = 0;
  for(int i = 0; i < Size; i += right)
  {

      if(i < Size - right)
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
      }
      else
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
      }
      
  }//end outer for
  
   for(auto &t : threads)
   {
      t.join();
   }
   
}


vector<pair<string, string>>  Freebase::returnValues(const string & query)
{
  vector<pair<string, string>> data;
  SparqlQuery sparql(query, _serverURL);
  string result = sparql.retrieveInformations();
  istringstream str(result);
  string line;
  //skip first line
  getline(str,line);
  while(getline(str, line))
  {
    vector<string> values;
    line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
    
    boost::split(values, line, boost::is_any_of("\t"));
    if(values.size() == 2)
    {
      pair<string,string> fact = make_pair(values[0], values[1]);
      data.push_back(fact);
    }
    else
    {
      data.push_back(make_pair(line, ""));
    }
  }
  
  return data;
}//end function

编辑: Arnon Zilca 的评论是正确的。您正在从多个线程(在 Freebase::multiThread() 中)写入单个向量,这是灾难的根源。您可以使用如下所述的互斥锁来保护 push_back 操作。

有关容器线程安全的更多信息,请参阅 Is std::vector or boost::vector thread safe?

所以:

mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();

另一种选择是使用与 returnValues 中相同的策略,在多线程中创建一个本地向量,并仅在线程完成处理时将内容推送到数据向量。

所以:

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  vector<pair<string,string>> threadResults;
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     threadResults.push_back(make_pair(entities[i], desc));
  }
  mtx.lock()
  data.insert(data.end(), threadResults.begin(), threadResults.end());
  mtx.unlock()
}

注意:我建议使用与您用于 cout 的互斥锁不同的互斥锁。总体结果向量 data 是与 cout 不同的资源。因此,想要使用 cout 的线程不必等待另一个线程完成 data

/编辑

您可以在

周围使用互斥锁
cout <<"Entity " << entities[i] << endl;

这将阻止多个线程在 "the same time" 处使用 cout。这样您就可以确保在另一个线程打印消息之前,一个线程打印了整条消息。请注意,这将影响您的性能,因为线程必须等待互斥体可用才能打印。

注意:保护 cout 只会清除流上的输出,不会影响其余代码的行为,请参见上文。

有关示例,请参阅 http://www.cplusplus.com/reference/mutex/mutex/lock/

// mutex::lock/unlock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_thread_id (int id) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  std::cout << "thread #" << id << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}