分段错误多线程 C++ 11
Segmentation fault multithreading C++ 11
简介
我有一个包含 44 百万个名字的向量 entities
。我想把它分成 4 部分并并行处理每个部分。 Class Freebase
包含函数 loadData()
用于拆分向量并调用函数 multiThread
以进行处理。
loadEntities()
读取包含名称的文本文件。我没有把实现放在 class 因为它不重要
loadData()
将在构造函数中初始化的向量 entities
拆分为 4 部分,并将每个部分添加到 vector<thread> threads
中,如下所示:
threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
- 多线程是我处理文件的函数
i
和i+right
是多线程的for循环中用来循环entity 的索引
returnValues
是multiThread
的子函数,用于调用外部函数。
问题
cout <<"Entity " << entities[i] << endl;
显示以下结果:
- 实体 m.0rzf6wv(正常)
- 实体 m.0rzf70(正常)
- 实体 m.068s4h9 m.0n_k8bz(错误)
- 实体实体 m.068s5_1(错误)
最后的 2 个输出是错误的。输出应该是:
Entity name
不是 entity entity name
也不是 entity name name
当输入被发送到函数 returnValues
时,这会导致分段错误。我该如何解决?
源代码
#ifndef FREEBASE_H
#define FREEBASE_H
class Freebase
{
public:
Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
void loadData();
private:
std::string _serverURL;
std::string _entities;
std::string _xmlFile;
void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
//private data members
std::vector<std::string> entities;
};
#endif
#include "Freebase.h"
#include "queries/SparqlQuery.h"
Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
entities = loadEntities();
}
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
data.push_back(make_pair(entities[i], desc));
}
}
void Freebase::loadData()
{
vector<pair<string, string>> data;
vector<thread> threads;
int Size = entities.size();
//split database into 4 parts
int p = 4;
int right = round((double)Size / (double)p);
int left = Size % p;
float totalduration = 0;
vector<pair<int, int>> coordinates;
int counter = 0;
for(int i = 0; i < Size; i += right)
{
if(i < Size - right)
{
threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
}
else
{
threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
}
}//end outer for
for(auto &t : threads)
{
t.join();
}
}
vector<pair<string, string>> Freebase::returnValues(const string & query)
{
vector<pair<string, string>> data;
SparqlQuery sparql(query, _serverURL);
string result = sparql.retrieveInformations();
istringstream str(result);
string line;
//skip first line
getline(str,line);
while(getline(str, line))
{
vector<string> values;
line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
boost::split(values, line, boost::is_any_of("\t"));
if(values.size() == 2)
{
pair<string,string> fact = make_pair(values[0], values[1]);
data.push_back(fact);
}
else
{
data.push_back(make_pair(line, ""));
}
}
return data;
}//end function
编辑:
Arnon Zilca 的评论是正确的。您正在从多个线程(在 Freebase::multiThread()
中)写入单个向量,这是灾难的根源。您可以使用如下所述的互斥锁来保护 push_back 操作。
有关容器线程安全的更多信息,请参阅 Is std::vector or boost::vector thread safe?。
所以:
mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();
另一种选择是使用与 returnValues 中相同的策略,在多线程中创建一个本地向量,并仅在线程完成处理时将内容推送到数据向量。
所以:
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
vector<pair<string,string>> threadResults;
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
threadResults.push_back(make_pair(entities[i], desc));
}
mtx.lock()
data.insert(data.end(), threadResults.begin(), threadResults.end());
mtx.unlock()
}
注意:我建议使用与您用于 cout 的互斥锁不同的互斥锁。总体结果向量 data
是与 cout
不同的资源。因此,想要使用 cout
的线程不必等待另一个线程完成 data
。
/编辑
您可以在
周围使用互斥锁
cout <<"Entity " << entities[i] << endl;
这将阻止多个线程在 "the same time" 处使用 cout。这样您就可以确保在另一个线程打印消息之前,一个线程打印了整条消息。请注意,这将影响您的性能,因为线程必须等待互斥体可用才能打印。
注意:保护 cout 只会清除流上的输出,不会影响其余代码的行为,请参见上文。
有关示例,请参阅 http://www.cplusplus.com/reference/mutex/mutex/lock/。
// mutex::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_thread_id (int id) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
std::cout << "thread #" << id << '\n';
mtx.unlock();
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}
简介
我有一个包含 44 百万个名字的向量 entities
。我想把它分成 4 部分并并行处理每个部分。 Class Freebase
包含函数 loadData()
用于拆分向量并调用函数 multiThread
以进行处理。
loadEntities()
读取包含名称的文本文件。我没有把实现放在 class 因为它不重要loadData()
将在构造函数中初始化的向量entities
拆分为 4 部分,并将每个部分添加到vector<thread> threads
中,如下所示:
threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
- 多线程是我处理文件的函数
i
和i+right
是多线程的for循环中用来循环entity 的索引
returnValues
是multiThread
的子函数,用于调用外部函数。
问题
cout <<"Entity " << entities[i] << endl;
显示以下结果:
- 实体 m.0rzf6wv(正常)
- 实体 m.0rzf70(正常)
- 实体 m.068s4h9 m.0n_k8bz(错误)
- 实体实体 m.068s5_1(错误)
最后的 2 个输出是错误的。输出应该是:
Entity name
不是entity entity name
也不是entity name name
当输入被发送到函数 returnValues
时,这会导致分段错误。我该如何解决?
源代码
#ifndef FREEBASE_H
#define FREEBASE_H
class Freebase
{
public:
Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
void loadData();
private:
std::string _serverURL;
std::string _entities;
std::string _xmlFile;
void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
//private data members
std::vector<std::string> entities;
};
#endif
#include "Freebase.h"
#include "queries/SparqlQuery.h"
Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
entities = loadEntities();
}
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
data.push_back(make_pair(entities[i], desc));
}
}
void Freebase::loadData()
{
vector<pair<string, string>> data;
vector<thread> threads;
int Size = entities.size();
//split database into 4 parts
int p = 4;
int right = round((double)Size / (double)p);
int left = Size % p;
float totalduration = 0;
vector<pair<int, int>> coordinates;
int counter = 0;
for(int i = 0; i < Size; i += right)
{
if(i < Size - right)
{
threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
}
else
{
threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
}
}//end outer for
for(auto &t : threads)
{
t.join();
}
}
vector<pair<string, string>> Freebase::returnValues(const string & query)
{
vector<pair<string, string>> data;
SparqlQuery sparql(query, _serverURL);
string result = sparql.retrieveInformations();
istringstream str(result);
string line;
//skip first line
getline(str,line);
while(getline(str, line))
{
vector<string> values;
line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
boost::split(values, line, boost::is_any_of("\t"));
if(values.size() == 2)
{
pair<string,string> fact = make_pair(values[0], values[1]);
data.push_back(fact);
}
else
{
data.push_back(make_pair(line, ""));
}
}
return data;
}//end function
编辑:
Arnon Zilca 的评论是正确的。您正在从多个线程(在 Freebase::multiThread()
中)写入单个向量,这是灾难的根源。您可以使用如下所述的互斥锁来保护 push_back 操作。
有关容器线程安全的更多信息,请参阅 Is std::vector or boost::vector thread safe?。
所以:
mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();
另一种选择是使用与 returnValues 中相同的策略,在多线程中创建一个本地向量,并仅在线程完成处理时将内容推送到数据向量。
所以:
void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
vector<pair<string,string>> threadResults;
string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
for(int i = start; i < end; i++)
{
cout <<"Entity " << entities[i] << endl;
vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
string desc = "";
for(auto &d: description)
{
desc += d.first + " ";
}
threadResults.push_back(make_pair(entities[i], desc));
}
mtx.lock()
data.insert(data.end(), threadResults.begin(), threadResults.end());
mtx.unlock()
}
注意:我建议使用与您用于 cout 的互斥锁不同的互斥锁。总体结果向量 data
是与 cout
不同的资源。因此,想要使用 cout
的线程不必等待另一个线程完成 data
。
/编辑
您可以在
周围使用互斥锁cout <<"Entity " << entities[i] << endl;
这将阻止多个线程在 "the same time" 处使用 cout。这样您就可以确保在另一个线程打印消息之前,一个线程打印了整条消息。请注意,这将影响您的性能,因为线程必须等待互斥体可用才能打印。
注意:保护 cout 只会清除流上的输出,不会影响其余代码的行为,请参见上文。
有关示例,请参阅 http://www.cplusplus.com/reference/mutex/mutex/lock/。
// mutex::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
std::mutex mtx; // mutex for critical section
void print_thread_id (int id) {
// critical section (exclusive access to std::cout signaled by locking mtx):
mtx.lock();
std::cout << "thread #" << id << '\n';
mtx.unlock();
}
int main ()
{
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i)
threads[i] = std::thread(print_thread_id,i+1);
for (auto& th : threads) th.join();
return 0;
}