使用 thread_local 维护并发内存缓冲区时出错
Error occurred when using thread_local to maintain a concurrent memory buffer
在下面的代码中,我想创建一个允许多个线程并发 read/write 的内存缓冲区。一次,所有线程将并行读取此缓冲区,稍后它们将并行写入缓冲区。但是不会有read/write同时操作
为此,我使用 shared_ptr<vector<uint64_t>>
的 vector
。当一个新线程到来时,它会被分配一个新的 vector<uint64_t>
并且只写入它。两个线程不会写入同一个向量。
我使用 thread_local 来跟踪矢量索引和当前线程将写入的偏移量。当我需要向 memory_
变量添加新缓冲区时,我使用互斥锁来保护它。
class TestBuffer {
public:
thread_local static uint32_t index_;
thread_local static uint32_t offset_;
thread_local static bool ready_;
vector<shared_ptr<vector<uint64_t>>> memory_;
mutex lock_;
void init() {
if (!ready_) {
new_slab();
ready_ = true;
}
}
void new_slab() {
std::lock_guard<mutex> lock(lock_);
index_ = memory_.size();
memory_.push_back(make_shared<vector<uint64_t>>(1000));
offset_ = 0;
}
void put(uint64_t value) {
init();
if (offset_ == 1000) {
new_slab();
}
if(memory_[index_] == nullptr) {
cout << "Error" << endl;
}
*(memory_[index_]->data() + offset_) = value;
offset_++;
}
};
thread_local uint32_t TestBuffer::index_ = 0;
thread_local uint32_t TestBuffer::offset_ = 0;
thread_local bool TestBuffer::ready_ = false;
int main() {
TestBuffer buffer;
vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
thread t = thread([&buffer, i]() {
for (int j = 0; j < 10000; ++j) {
buffer.put(i * 10000 + j);
}
});
threads.emplace_back(move(t));
}
for (auto &t: threads) {
t.join();
}
}
代码未按预期运行,报告错误在 put
函数中。根本原因是memory_[index_]
有时returnnullptr
。但是,我不明白为什么这是可能的,因为我认为我已经正确设置了值。感谢您的帮助!
您在 put
中存在由 new_slab()
引起的竞争条件。当 new_slab
调用 memory_.push_back()
时,_memory
向量可能需要调整自身的大小,如果在调整大小的过程中另一个线程正在执行 put
,memory_[index_]
可能会访问过时的数据。
一种解决方案是通过锁定互斥量来保护 _memory
向量:
{
std::lock_guard<mutex> lock(lock_);
if(memory_[index_] == nullptr) {
cout << "Error" << endl;
}
*(memory_[index_]->data() + offset_) = value;
}
另一种是提前在memory_
vector中预留你需要的space
在下面的代码中,我想创建一个允许多个线程并发 read/write 的内存缓冲区。一次,所有线程将并行读取此缓冲区,稍后它们将并行写入缓冲区。但是不会有read/write同时操作
为此,我使用 shared_ptr<vector<uint64_t>>
的 vector
。当一个新线程到来时,它会被分配一个新的 vector<uint64_t>
并且只写入它。两个线程不会写入同一个向量。
我使用 thread_local 来跟踪矢量索引和当前线程将写入的偏移量。当我需要向 memory_
变量添加新缓冲区时,我使用互斥锁来保护它。
class TestBuffer {
public:
thread_local static uint32_t index_;
thread_local static uint32_t offset_;
thread_local static bool ready_;
vector<shared_ptr<vector<uint64_t>>> memory_;
mutex lock_;
void init() {
if (!ready_) {
new_slab();
ready_ = true;
}
}
void new_slab() {
std::lock_guard<mutex> lock(lock_);
index_ = memory_.size();
memory_.push_back(make_shared<vector<uint64_t>>(1000));
offset_ = 0;
}
void put(uint64_t value) {
init();
if (offset_ == 1000) {
new_slab();
}
if(memory_[index_] == nullptr) {
cout << "Error" << endl;
}
*(memory_[index_]->data() + offset_) = value;
offset_++;
}
};
thread_local uint32_t TestBuffer::index_ = 0;
thread_local uint32_t TestBuffer::offset_ = 0;
thread_local bool TestBuffer::ready_ = false;
int main() {
TestBuffer buffer;
vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
thread t = thread([&buffer, i]() {
for (int j = 0; j < 10000; ++j) {
buffer.put(i * 10000 + j);
}
});
threads.emplace_back(move(t));
}
for (auto &t: threads) {
t.join();
}
}
代码未按预期运行,报告错误在 put
函数中。根本原因是memory_[index_]
有时returnnullptr
。但是,我不明白为什么这是可能的,因为我认为我已经正确设置了值。感谢您的帮助!
您在 put
中存在由 new_slab()
引起的竞争条件。当 new_slab
调用 memory_.push_back()
时,_memory
向量可能需要调整自身的大小,如果在调整大小的过程中另一个线程正在执行 put
,memory_[index_]
可能会访问过时的数据。
一种解决方案是通过锁定互斥量来保护 _memory
向量:
{
std::lock_guard<mutex> lock(lock_);
if(memory_[index_] == nullptr) {
cout << "Error" << endl;
}
*(memory_[index_]->data() + offset_) = value;
}
另一种是提前在memory_
vector中预留你需要的space