C++ 信号量混淆?
C++ Semaphore Confusion?
所以,我正在编写一种类似示波器的程序,它读取计算机上的串行端口并对这些数据执行 fft 以将其转换为频谱。我 运行 遇到了一个问题,尽管我的程序布局被分解为 SerialHandler
class(利用 boost::Asio
),FFTHandler
class,以及一个 main
函数。 SerialHandler
class 使用 boost::Asio`` async_read_some
函数从端口读取并引发一个名为 HandleOnPortReceive
的事件,然后该事件本身读取数据。
问题是我无法找到一种方法将该数据从事件处理程序传递到 FFTHandler
class,这是在另一个线程上。有人建议我使用信号量来解决我的问题,但我对 semaphore.h 的用法几乎一无所知,所以我的实现现在相当糟糕,没有做任何它应该做的事情。
下面是一些代码,如果这样可以使它更清楚一些的话:
using namespace Foo;
//main function
int main(void){
SerialHandler serialHandler;
FFTHandler fftHandler;
sem_t *qSem_ptr = &qSem;
sem_init(qSem_ptr, 1, 0);
//create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following
serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration
t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem));
//allow the user to stop the program and avoid the problem of an infinite loop blocking the program
char inChar = getchar();
if (inChar) {...some logic to stop reading}
}
namespace Foo{
boost::thread *t1;
boost::thread *t2;
sem_t qSem;
std::queue<double> q;
boost::mutex mutex_;
class SerialHandler{
private:
char *rawBuffer; //array to hold incoming data
boost::asio::io_service ioService;
boost::asio::serial_port_ptr serialPort;
public:
void SerialHandler::StartConnection(int _baudRate, string _comPort){
//some functionality to open the port that is irrelevant to the question goes here
AsyncReadSome(); //starts the read loop
//create thread for io_service object and let function go out of scope
t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
void SerialHandler::AsyncReadSome(){
//there's some other stuff here for error_catching, but this is the only important part
serialPort->async_read_some (
boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE),
boost::bind(
&SerialHandler::HandlePortOnReceive,
this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred, q));
}
void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){
boost::mutex::scoped_lock lock(mutex_);
//more error checking goes here, but I've made sure they aren't returning and are not the issue
for (unsigned int i =0; i<bytes_transferred; i++){
unsigned char c = rawBuffer[i];
double d = (double) c; //loop through buffer and read
if (c==endOfLineChar){
} else //if not delimiting char, push into queue and post semaphore
{
q.push(d);
//cout << d << endl;
sem_post(&qSem);
cout << q.front() << endl;
cout << "size is: " << q.size() << endl;
}
}
//loop back on itself and start the next read
AsyncReadSome();
}
}
class FFTHandler{
private:
double *in; //array to hold inputs
fftw_complex *out; //holds outputs
int currentIndex;
bool filled;
const int N;
public:
void AppendIn(std::queue<double> &q, sem_t &qSem){
while(1){ //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively...
cout << "test" << endl;
sem_wait(&_qSem); //wait for data...this is blocking but I don't know why
double d = _q.front();
_q.pop();
in[currentIndex]=d; //read queue, pop, then append in array
currentIndex++;
if (currentIndex == N){ //run FFT if full and reset index
currentIndex = N-overlap-1;
filled = true;
RunFFT();
}
}
}
}
}
FFTHandler::AppendIn(..)
中的调试行确实在触发,因此正在创建线程,但它似乎立即超出了范围并破坏了线程,因为我似乎已经设置了 while对信号量的响应不正确。
TLDR:这是一个很长的解释,简单地说,“我不理解信号量,但需要以某种方式实现它们。我尝试过,但失败了,所以现在我来到这里希望能从比我更有知识的人那里得到关于这段代码的帮助。
更新: 所以在尝试了一些调试语句之后,问题似乎是 while(1){...}
语句确实在触发,但是 sem_wait(&_qSem);
导致它阻塞。无论出于何种原因,它都会无限期地等待,尽管正在发布信号量,但它会继续等待并且永远不会超出该线。
信号量的初始值为0,对本例有效。所以它需要 sem_post 才能解锁 FFTHandler::AppendIn()
。但是我没有看到第一次调用 SerialHandler::AsyncReadSome()
以读取串行端口并将其推送到队列中的代码。如果您修复了那部分代码,我认为 sem_post 会发生并且 FFTHandler 线程会 运行。作为第一步,您可以在 sem_wait 之后调试打印一个,在 AsyncReadSome() 函数内部打印一个,我猜这两个都不会被执行。
因此,基本上您需要确保 'reading' 作为主线程或其他线程的一部分启动并保持活动状态。
由于您已经在使用 boost::mutex
及其作用域锁类型,我建议您使用 boost::condition_variable
而不是 POSIX 信号量。否则,您会将 C++11 风格的同步与 POSIX 同步混合在一起。
您在添加到队列时锁定了互斥量,但我没有看到任何锁定互斥量以从队列中读取的内容。看起来您正在循环调用 AsyncReadSome
,而互斥量仍处于锁定状态。
选择一种同步形式,然后正确使用它。
所以,我正在编写一种类似示波器的程序,它读取计算机上的串行端口并对这些数据执行 fft 以将其转换为频谱。我 运行 遇到了一个问题,尽管我的程序布局被分解为 SerialHandler
class(利用 boost::Asio
),FFTHandler
class,以及一个 main
函数。 SerialHandler
class 使用 boost::Asio`` async_read_some
函数从端口读取并引发一个名为 HandleOnPortReceive
的事件,然后该事件本身读取数据。
问题是我无法找到一种方法将该数据从事件处理程序传递到 FFTHandler
class,这是在另一个线程上。有人建议我使用信号量来解决我的问题,但我对 semaphore.h 的用法几乎一无所知,所以我的实现现在相当糟糕,没有做任何它应该做的事情。
下面是一些代码,如果这样可以使它更清楚一些的话:
using namespace Foo;
//main function
int main(void){
SerialHandler serialHandler;
FFTHandler fftHandler;
sem_t *qSem_ptr = &qSem;
sem_init(qSem_ptr, 1, 0);
//create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following
serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration
t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem));
//allow the user to stop the program and avoid the problem of an infinite loop blocking the program
char inChar = getchar();
if (inChar) {...some logic to stop reading}
}
namespace Foo{
boost::thread *t1;
boost::thread *t2;
sem_t qSem;
std::queue<double> q;
boost::mutex mutex_;
class SerialHandler{
private:
char *rawBuffer; //array to hold incoming data
boost::asio::io_service ioService;
boost::asio::serial_port_ptr serialPort;
public:
void SerialHandler::StartConnection(int _baudRate, string _comPort){
//some functionality to open the port that is irrelevant to the question goes here
AsyncReadSome(); //starts the read loop
//create thread for io_service object and let function go out of scope
t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
void SerialHandler::AsyncReadSome(){
//there's some other stuff here for error_catching, but this is the only important part
serialPort->async_read_some (
boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE),
boost::bind(
&SerialHandler::HandlePortOnReceive,
this, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred, q));
}
void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){
boost::mutex::scoped_lock lock(mutex_);
//more error checking goes here, but I've made sure they aren't returning and are not the issue
for (unsigned int i =0; i<bytes_transferred; i++){
unsigned char c = rawBuffer[i];
double d = (double) c; //loop through buffer and read
if (c==endOfLineChar){
} else //if not delimiting char, push into queue and post semaphore
{
q.push(d);
//cout << d << endl;
sem_post(&qSem);
cout << q.front() << endl;
cout << "size is: " << q.size() << endl;
}
}
//loop back on itself and start the next read
AsyncReadSome();
}
}
class FFTHandler{
private:
double *in; //array to hold inputs
fftw_complex *out; //holds outputs
int currentIndex;
bool filled;
const int N;
public:
void AppendIn(std::queue<double> &q, sem_t &qSem){
while(1){ //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively...
cout << "test" << endl;
sem_wait(&_qSem); //wait for data...this is blocking but I don't know why
double d = _q.front();
_q.pop();
in[currentIndex]=d; //read queue, pop, then append in array
currentIndex++;
if (currentIndex == N){ //run FFT if full and reset index
currentIndex = N-overlap-1;
filled = true;
RunFFT();
}
}
}
}
}
FFTHandler::AppendIn(..)
中的调试行确实在触发,因此正在创建线程,但它似乎立即超出了范围并破坏了线程,因为我似乎已经设置了 while对信号量的响应不正确。
TLDR:这是一个很长的解释,简单地说,“我不理解信号量,但需要以某种方式实现它们。我尝试过,但失败了,所以现在我来到这里希望能从比我更有知识的人那里得到关于这段代码的帮助。
更新: 所以在尝试了一些调试语句之后,问题似乎是 while(1){...}
语句确实在触发,但是 sem_wait(&_qSem);
导致它阻塞。无论出于何种原因,它都会无限期地等待,尽管正在发布信号量,但它会继续等待并且永远不会超出该线。
信号量的初始值为0,对本例有效。所以它需要 sem_post 才能解锁 FFTHandler::AppendIn()
。但是我没有看到第一次调用 SerialHandler::AsyncReadSome()
以读取串行端口并将其推送到队列中的代码。如果您修复了那部分代码,我认为 sem_post 会发生并且 FFTHandler 线程会 运行。作为第一步,您可以在 sem_wait 之后调试打印一个,在 AsyncReadSome() 函数内部打印一个,我猜这两个都不会被执行。
因此,基本上您需要确保 'reading' 作为主线程或其他线程的一部分启动并保持活动状态。
由于您已经在使用 boost::mutex
及其作用域锁类型,我建议您使用 boost::condition_variable
而不是 POSIX 信号量。否则,您会将 C++11 风格的同步与 POSIX 同步混合在一起。
您在添加到队列时锁定了互斥量,但我没有看到任何锁定互斥量以从队列中读取的内容。看起来您正在循环调用 AsyncReadSome
,而互斥量仍处于锁定状态。
选择一种同步形式,然后正确使用它。