多线程使用者使用boost ringbuffer不产生与输入相同的输出
multithreading consumer not producing same output as input using boost ringbuffer
在我的主程序中,我将一个字符串缓冲区复制到一个增强环形缓冲区中,然后尝试在创建的线程中使用该数据并写入文件。在主线程中,我也在将相同的数据写入文件,但输入和输出文件不匹配。
我觉得我在做一些非常愚蠢的事情。请帮忙。此外,如果有任何改进代码的建议,我们将不胜感激。
#include <iostream>
#include <vector>
#include <boost/circular_buffer.hpp>
#include <numeric>
#include <assert.h>
#include <thread>
#include <mutex>
#include <chrono>
#include <time.h>
#include <cstdint>
#include <fstream>
#include <string>
using std::cin;
using std::cout;
using std::endl;
using std::fstream;
using std::string;
#define SOME_FIXED_HARDCODED_NUMBER 40980
class MyClass {
public:
std::vector<int8_t> vec;
public:
MyClass(std::vector<int8_t> v){ vec = v; }
};
boost::circular_buffer<MyClass> cb(300);
int waiting = 1;
std::mutex my_mutex;
FILE *out_file;
FILE *in_file;
void foo()
{
while (waiting) {
std::unique_lock<std::mutex> lock(my_mutex);
if (!cb.size() || waiting == 0) {
lock.unlock();
continue;
}
if (!waiting)
break;
MyClass local_buf = cb.front();
cb.pop_front();
fwrite(local_buf.vec.data(), 1, local_buf.vec.size(), out_file);
}
}
int main(int argc, char* argv[])
{
out_file = fopen("output_data.raw", "w");
in_file = fopen("input_data.raw", "w");
std::thread th1(foo);
char *buf = {"abc"};
int counter = 0;
std::vector<int8_t> mem;
mem.insert(mem.end(), buf, buf + strlen(buf));
while (counter < SOME_FIXED_HARDCODED_NUMBER)
{
{
std::unique_lock<std::mutex> lock(my_mutex);
/* if the circular buffer is full then wait for consumer to pull the data */
while (cb.full()) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::unique_lock<std::mutex> lock(my_mutex);
}
cb.push_front(MyClass(mem));
fwrite(mem.data(), 1, mem.size(), in_file);
}
counter++;
}
waiting = 0;
th1.join();
fclose(out_file);
fclose(in_file);
return 0;
}
while (cb.full()) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
>>> std::unique_lock<std::mutex> lock(my_mutex);
}
标记的 unique_lock
不会执行任何操作,因为它会立即超出范围并解锁互斥体。因此,一旦你离开循环,互斥锁就不会被锁定,你就会遇到竞争条件。相反,您应该使用 lock.lock()
重新锁定互斥量。
还有一些错误。您不是在等待 foo
线程实际耗尽缓冲区。一旦等待标志被主线程设置,它就会停止。另外,等待应该是原子的。
在我的主程序中,我将一个字符串缓冲区复制到一个增强环形缓冲区中,然后尝试在创建的线程中使用该数据并写入文件。在主线程中,我也在将相同的数据写入文件,但输入和输出文件不匹配。
我觉得我在做一些非常愚蠢的事情。请帮忙。此外,如果有任何改进代码的建议,我们将不胜感激。
#include <iostream>
#include <vector>
#include <boost/circular_buffer.hpp>
#include <numeric>
#include <assert.h>
#include <thread>
#include <mutex>
#include <chrono>
#include <time.h>
#include <cstdint>
#include <fstream>
#include <string>
using std::cin;
using std::cout;
using std::endl;
using std::fstream;
using std::string;
#define SOME_FIXED_HARDCODED_NUMBER 40980
class MyClass {
public:
std::vector<int8_t> vec;
public:
MyClass(std::vector<int8_t> v){ vec = v; }
};
boost::circular_buffer<MyClass> cb(300);
int waiting = 1;
std::mutex my_mutex;
FILE *out_file;
FILE *in_file;
void foo()
{
while (waiting) {
std::unique_lock<std::mutex> lock(my_mutex);
if (!cb.size() || waiting == 0) {
lock.unlock();
continue;
}
if (!waiting)
break;
MyClass local_buf = cb.front();
cb.pop_front();
fwrite(local_buf.vec.data(), 1, local_buf.vec.size(), out_file);
}
}
int main(int argc, char* argv[])
{
out_file = fopen("output_data.raw", "w");
in_file = fopen("input_data.raw", "w");
std::thread th1(foo);
char *buf = {"abc"};
int counter = 0;
std::vector<int8_t> mem;
mem.insert(mem.end(), buf, buf + strlen(buf));
while (counter < SOME_FIXED_HARDCODED_NUMBER)
{
{
std::unique_lock<std::mutex> lock(my_mutex);
/* if the circular buffer is full then wait for consumer to pull the data */
while (cb.full()) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::unique_lock<std::mutex> lock(my_mutex);
}
cb.push_front(MyClass(mem));
fwrite(mem.data(), 1, mem.size(), in_file);
}
counter++;
}
waiting = 0;
th1.join();
fclose(out_file);
fclose(in_file);
return 0;
}
while (cb.full()) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
>>> std::unique_lock<std::mutex> lock(my_mutex);
}
标记的 unique_lock
不会执行任何操作,因为它会立即超出范围并解锁互斥体。因此,一旦你离开循环,互斥锁就不会被锁定,你就会遇到竞争条件。相反,您应该使用 lock.lock()
重新锁定互斥量。
还有一些错误。您不是在等待 foo
线程实际耗尽缓冲区。一旦等待标志被主线程设置,它就会停止。另外,等待应该是原子的。