无锁并发示例;为什么不安全?
lock free concurrency example; why is it not safe?
我正在尝试了解 C++ 中的并发性,在这样做的过程中,我正在试验看看哪些有效,哪些无效。下面的示例设计不佳,我知道有更好的设计方法,但我想知道为什么线程 1 和线程 2 似乎能够在共享数组中相互覆盖。我认为在共享 idx_atomic
索引的加载和写入之上和之下使用 acquire/release 语义对共享 flag_atomic
变量的操作将阻止线程 1 和线程 2 检索相同的索引值,无论idx_atomic
个操作内存标签?
作为参考,我使用的是 MSVC 和 x64。
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <chrono>
using namespace std::chrono; // for ""ms operator
const size_t c_size = 40;
std::vector<int> shared_array;
std::atomic<bool> sync_start_atomic = false;
std::atomic<bool> flag_atomic = false;
std::atomic<size_t> idx_atomic = 0;
void thread1_x() {
bool expected_flag = false;
size_t temp_idx = 0;
while (!sync_start_atomic.load(std::memory_order_relaxed));
for (size_t i = 0; i < (c_size / 2); ++i) {
while (flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
expected_flag = false;
}
temp_idx = idx_atomic.load(std::memory_order_relaxed);
idx_atomic.store((temp_idx + 1), std::memory_order_relaxed);
flag_atomic.store(false, std::memory_order_release);
shared_array[temp_idx] = i;
}
}
void thread2_x() {
bool expected_flag = false;
size_t temp_idx = 0;
while (!sync_start_atomic.load(std::memory_order_relaxed));
for (size_t i = 0; i < (c_size / 2); ++i) {
while (flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
expected_flag = false;
}
temp_idx = idx_atomic.load(std::memory_order_relaxed);
idx_atomic.store((temp_idx + 1), std::memory_order_relaxed);
flag_atomic.store(false, std::memory_order_release);
shared_array[temp_idx] = i + 100;
}
}
void main(){
shared_array.reserve(c_size);
shared_array.assign(c_size, 0);
std::thread tn_1(thread1_x);
std::thread tn_2(thread2_x);
std::this_thread::sleep_for(60ms);
sync_start_atomic.store(true, std::memory_order_relaxed);
tn_1.join();
tn_2.join();
for (size_t i = 0; i < c_size; ++i) {
std::cout << shared_array[i] << " ";
}
std::cout << "\n";
}
示例真实输出:
100, 1, 101, 2, 3, 102, 4, 103, 104, 6, 106, 8, 108, 9, 10, 109, 11, 110, 12, 111, 14, 112, 113 , 16, 17, 18, 115, 19, 116, 117, 118, 119, 0, 0, 0, 0, 0, 0, 0, 0.
示例预期输出:
0, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 1, 2, 114, 3, 115, 4, 5, 6 , 7, 8, 9, 10, 11, 12, 13, 14, 15, 116, 16, 117, 17, 118, 18, 119, 19.
您的示例输出表明两个线程都在同时访问 idx_atomic
,这表明您的 flag_atomic
循环存在问题。您正在使用的条件检查是倒退的。 compare_exchange_weak
将 return flag_atomic == expected_flag
比较的结果 - 换句话说,它 returns true
当值被更新时。由于发生这种情况时要退出循环,因此比较应该是
while (!flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire))
我正在尝试了解 C++ 中的并发性,在这样做的过程中,我正在试验看看哪些有效,哪些无效。下面的示例设计不佳,我知道有更好的设计方法,但我想知道为什么线程 1 和线程 2 似乎能够在共享数组中相互覆盖。我认为在共享 idx_atomic
索引的加载和写入之上和之下使用 acquire/release 语义对共享 flag_atomic
变量的操作将阻止线程 1 和线程 2 检索相同的索引值,无论idx_atomic
个操作内存标签?
作为参考,我使用的是 MSVC 和 x64。
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <chrono>
using namespace std::chrono; // for ""ms operator
const size_t c_size = 40;
std::vector<int> shared_array;
std::atomic<bool> sync_start_atomic = false;
std::atomic<bool> flag_atomic = false;
std::atomic<size_t> idx_atomic = 0;
void thread1_x() {
bool expected_flag = false;
size_t temp_idx = 0;
while (!sync_start_atomic.load(std::memory_order_relaxed));
for (size_t i = 0; i < (c_size / 2); ++i) {
while (flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
expected_flag = false;
}
temp_idx = idx_atomic.load(std::memory_order_relaxed);
idx_atomic.store((temp_idx + 1), std::memory_order_relaxed);
flag_atomic.store(false, std::memory_order_release);
shared_array[temp_idx] = i;
}
}
void thread2_x() {
bool expected_flag = false;
size_t temp_idx = 0;
while (!sync_start_atomic.load(std::memory_order_relaxed));
for (size_t i = 0; i < (c_size / 2); ++i) {
while (flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
expected_flag = false;
}
temp_idx = idx_atomic.load(std::memory_order_relaxed);
idx_atomic.store((temp_idx + 1), std::memory_order_relaxed);
flag_atomic.store(false, std::memory_order_release);
shared_array[temp_idx] = i + 100;
}
}
void main(){
shared_array.reserve(c_size);
shared_array.assign(c_size, 0);
std::thread tn_1(thread1_x);
std::thread tn_2(thread2_x);
std::this_thread::sleep_for(60ms);
sync_start_atomic.store(true, std::memory_order_relaxed);
tn_1.join();
tn_2.join();
for (size_t i = 0; i < c_size; ++i) {
std::cout << shared_array[i] << " ";
}
std::cout << "\n";
}
示例真实输出:
100, 1, 101, 2, 3, 102, 4, 103, 104, 6, 106, 8, 108, 9, 10, 109, 11, 110, 12, 111, 14, 112, 113 , 16, 17, 18, 115, 19, 116, 117, 118, 119, 0, 0, 0, 0, 0, 0, 0, 0.
示例预期输出:
0, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 1, 2, 114, 3, 115, 4, 5, 6 , 7, 8, 9, 10, 11, 12, 13, 14, 15, 116, 16, 117, 17, 118, 18, 119, 19.
您的示例输出表明两个线程都在同时访问 idx_atomic
,这表明您的 flag_atomic
循环存在问题。您正在使用的条件检查是倒退的。 compare_exchange_weak
将 return flag_atomic == expected_flag
比较的结果 - 换句话说,它 returns true
当值被更新时。由于发生这种情况时要退出循环,因此比较应该是
while (!flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire))