将数据用于互斥锁和等待时出现死锁
deadlock in using data for mutex and waits
我需要一些有关并发 C++ 编程的帮助。
我有一个名称文件,名为 "names.txt"
,格式如下:
0 James
1 Sara
2 Isaac
我还有另一个名为 "op.txt"
的文件,其中包含对名称文件的一些操作,格式如下:
0 1 + // this means add Sara to James and store it in 0 position
1 2 $ // this means swap values in position 1 and position 2
和一个包含操作输出的文件 "output.txt"
,格式如下:
0 JamesSara
1 Isaac
2 Sara
问题是创建一个线程来读取 names.txt
和 op.txt
并存储它们。接下来创建一些可变线程来并发操作,最后在线程中执行output.txt
。
这是我针对这个问题的代码,当并发线程数为
大于 2。但是 1 和 2 线程的输出不正确。
我在这段代码中遗漏了什么?
#include <fstream>
#include <iostream>
#include <vector>
#include <sstream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
using namespace std;
std::mutex _opMutex;
std::condition_variable _initCondition;
std::condition_variable _operationCondition;
int _counter = 0;
int _initCounter = 0;
int _doOperationCounter = 0;
struct OperationStruct
{
int firstOperand;
int secondOperand;
char cOperator;
};
const int THREADS = 5;
std::deque<std::pair<int, string> > _nameVector;
std::deque<OperationStruct> _opStructVec;
void initNamesAndOperations()
{
ifstream infile;
std::pair<int, string> namePair;
infile.open("names.txt");
if (!infile)
{
cout << "Unable to open file";
exit(-1);
}
int id;
string value;
while (infile >> id >> value)
{
namePair.first = id;
namePair.second = value;
_nameVector.push_back(namePair);
}
infile.close();
infile.open("op.txt");
if (!infile)
{
cout << "Unable to open file";
exit(-1);
}
int firstOperand;
int secondOperand;
char cOperator;
while (infile >> firstOperand >> secondOperand >> cOperator)
{
OperationStruct opSt;
opSt.firstOperand = firstOperand;
opSt.secondOperand = secondOperand;
opSt.cOperator = cOperator;
_opStructVec.push_back(opSt);
++_initCounter;
}
infile.close();
return;
}
void doOperationMath(int firstIndex, string firstValue, string secondValue, char cOp)
{
//basic mathematics
switch (cOp)
{
case '+':
{
for (int i = 0; i < _nameVector.size(); ++i)
{
std::pair<int, string> acc = _nameVector[i];
if (acc.first == firstIndex)
{
acc.second = firstValue + secondValue;
_nameVector[i].second = acc.second;
}
}
}
break;
default:
break;
}
++_doOperationCounter;
}
void doOperationSwap(int firstIndex, int secondIndex, string firstValue, string secondValue)
{
//swap
for (int i = 0; i < _nameVector.size(); ++i)
{
if (_nameVector[i].first == firstIndex)
_nameVector[i].second = secondValue;
if (_nameVector[i].first == secondIndex)
_nameVector[i].second = firstValue;
}
++_doOperationCounter;
}
void doOperations()
{
while (_doOperationCounter < _initCounter)
{
std::unique_lock<mutex> locker(_opMutex);
_initCondition.wait(locker, [](){return !_opStructVec.empty(); });
OperationStruct opSt = _opStructVec.front();
_opStructVec.pop_front();
locker.unlock();
_operationCondition.notify_one();
int firstId = opSt.firstOperand;
int secondId = opSt.secondOperand;
char cOp = opSt.cOperator;
string firstValue = "";
string secondValue = "";
for (int j = 0; j < _nameVector.size(); ++j)
{
std::pair<int, string> acc = _nameVector[j];
if (firstId == acc.first)
firstValue = acc.second;
if (secondId == acc.first)
secondValue = acc.second;
}
if (cOp == '$')
{
doOperationSwap(firstId, secondId, firstValue, secondValue);
}
else
{
doOperationMath(firstId, firstValue, secondValue, cOp);
}
}
return;
}
void doOutputFile()
{
ofstream outfile;
outfile.open("sampleOutput.txt", std::ios::out | std::ios::app);
if (!outfile)
{
cout << "Unable to open the file";
exit(-1);
}
while (_counter < _initCounter)
{
std::unique_lock<mutex> locker(_opMutex);
_operationCondition.wait(locker, [](){return !_nameVector.empty(); });
auto accPair = _nameVector.front();
_nameVector.pop_front();
locker.unlock();
outfile << accPair.first << " " << accPair.second << endl;
++_counter;
}
return;
}
int main()
{
thread th1(initNamesAndOperations);
std::vector<thread> operationalThreads;
for (int i = 0; i < THREADS; ++i)
{
operationalThreads.push_back(thread(doOperations));
}
thread th3(doOutputFile);
th1.join();
for (auto& opthread : operationalThreads)
opthread.join();
th3.join();
return 0;
}
如果从多个线程修改变量,您可能必须使用一些同步来确保读取正确的值。最简单的方法可能是对变量使用 std::atomic
以确保操作顺序正确。
此外,您的代码中没有任何内容可以确保您的 doOperations
线程在您阅读整个文件之前不会完成。
显然,您需要先读取整个数据,或者有办法等待某些数据可用(或到达数据末尾)。如果读取初始数据很快但处理速度很慢,那么更简单的解决方案是在开始处理线程之前读取数据。
可能发生的情况是,如果您创建了很多线程,那么在您创建最后一个线程时,initNamesAndOperations
会读取整个文件。
我强烈建议您购买并阅读 Anthony Williams 的 C++ Concurrency in Action。通过阅读这本书,您将对现代 C++ 多线程有一个很好的理解,它将对您编写正确的代码有很大帮助。
我需要一些有关并发 C++ 编程的帮助。
我有一个名称文件,名为 "names.txt"
,格式如下:
0 James
1 Sara
2 Isaac
我还有另一个名为 "op.txt"
的文件,其中包含对名称文件的一些操作,格式如下:
0 1 + // this means add Sara to James and store it in 0 position
1 2 $ // this means swap values in position 1 and position 2
和一个包含操作输出的文件 "output.txt"
,格式如下:
0 JamesSara
1 Isaac
2 Sara
问题是创建一个线程来读取 names.txt
和 op.txt
并存储它们。接下来创建一些可变线程来并发操作,最后在线程中执行output.txt
。
这是我针对这个问题的代码,当并发线程数为 大于 2。但是 1 和 2 线程的输出不正确。 我在这段代码中遗漏了什么?
#include <fstream>
#include <iostream>
#include <vector>
#include <sstream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
using namespace std;
std::mutex _opMutex;
std::condition_variable _initCondition;
std::condition_variable _operationCondition;
int _counter = 0;
int _initCounter = 0;
int _doOperationCounter = 0;
struct OperationStruct
{
int firstOperand;
int secondOperand;
char cOperator;
};
const int THREADS = 5;
std::deque<std::pair<int, string> > _nameVector;
std::deque<OperationStruct> _opStructVec;
void initNamesAndOperations()
{
ifstream infile;
std::pair<int, string> namePair;
infile.open("names.txt");
if (!infile)
{
cout << "Unable to open file";
exit(-1);
}
int id;
string value;
while (infile >> id >> value)
{
namePair.first = id;
namePair.second = value;
_nameVector.push_back(namePair);
}
infile.close();
infile.open("op.txt");
if (!infile)
{
cout << "Unable to open file";
exit(-1);
}
int firstOperand;
int secondOperand;
char cOperator;
while (infile >> firstOperand >> secondOperand >> cOperator)
{
OperationStruct opSt;
opSt.firstOperand = firstOperand;
opSt.secondOperand = secondOperand;
opSt.cOperator = cOperator;
_opStructVec.push_back(opSt);
++_initCounter;
}
infile.close();
return;
}
void doOperationMath(int firstIndex, string firstValue, string secondValue, char cOp)
{
//basic mathematics
switch (cOp)
{
case '+':
{
for (int i = 0; i < _nameVector.size(); ++i)
{
std::pair<int, string> acc = _nameVector[i];
if (acc.first == firstIndex)
{
acc.second = firstValue + secondValue;
_nameVector[i].second = acc.second;
}
}
}
break;
default:
break;
}
++_doOperationCounter;
}
void doOperationSwap(int firstIndex, int secondIndex, string firstValue, string secondValue)
{
//swap
for (int i = 0; i < _nameVector.size(); ++i)
{
if (_nameVector[i].first == firstIndex)
_nameVector[i].second = secondValue;
if (_nameVector[i].first == secondIndex)
_nameVector[i].second = firstValue;
}
++_doOperationCounter;
}
void doOperations()
{
while (_doOperationCounter < _initCounter)
{
std::unique_lock<mutex> locker(_opMutex);
_initCondition.wait(locker, [](){return !_opStructVec.empty(); });
OperationStruct opSt = _opStructVec.front();
_opStructVec.pop_front();
locker.unlock();
_operationCondition.notify_one();
int firstId = opSt.firstOperand;
int secondId = opSt.secondOperand;
char cOp = opSt.cOperator;
string firstValue = "";
string secondValue = "";
for (int j = 0; j < _nameVector.size(); ++j)
{
std::pair<int, string> acc = _nameVector[j];
if (firstId == acc.first)
firstValue = acc.second;
if (secondId == acc.first)
secondValue = acc.second;
}
if (cOp == '$')
{
doOperationSwap(firstId, secondId, firstValue, secondValue);
}
else
{
doOperationMath(firstId, firstValue, secondValue, cOp);
}
}
return;
}
void doOutputFile()
{
ofstream outfile;
outfile.open("sampleOutput.txt", std::ios::out | std::ios::app);
if (!outfile)
{
cout << "Unable to open the file";
exit(-1);
}
while (_counter < _initCounter)
{
std::unique_lock<mutex> locker(_opMutex);
_operationCondition.wait(locker, [](){return !_nameVector.empty(); });
auto accPair = _nameVector.front();
_nameVector.pop_front();
locker.unlock();
outfile << accPair.first << " " << accPair.second << endl;
++_counter;
}
return;
}
int main()
{
thread th1(initNamesAndOperations);
std::vector<thread> operationalThreads;
for (int i = 0; i < THREADS; ++i)
{
operationalThreads.push_back(thread(doOperations));
}
thread th3(doOutputFile);
th1.join();
for (auto& opthread : operationalThreads)
opthread.join();
th3.join();
return 0;
}
如果从多个线程修改变量,您可能必须使用一些同步来确保读取正确的值。最简单的方法可能是对变量使用 std::atomic
以确保操作顺序正确。
此外,您的代码中没有任何内容可以确保您的 doOperations
线程在您阅读整个文件之前不会完成。
显然,您需要先读取整个数据,或者有办法等待某些数据可用(或到达数据末尾)。如果读取初始数据很快但处理速度很慢,那么更简单的解决方案是在开始处理线程之前读取数据。
可能发生的情况是,如果您创建了很多线程,那么在您创建最后一个线程时,initNamesAndOperations
会读取整个文件。
我强烈建议您购买并阅读 Anthony Williams 的 C++ Concurrency in Action。通过阅读这本书,您将对现代 C++ 多线程有一个很好的理解,它将对您编写正确的代码有很大帮助。