无锁并发示例;为什么不安全?

lock free concurrency example; why is it not safe?

我正在尝试了解 C++ 中的并发性,在这样做的过程中,我正在试验看看哪些有效,哪些无效。下面的示例设计不佳,我知道有更好的设计方法,但我想知道为什么线程 1 和线程 2 似乎能够在共享数组中相互覆盖。我认为在共享 idx_atomic 索引的加载和写入之上和之下使用 acquire/release 语义对共享 flag_atomic 变量的操作将阻止线程 1 和线程 2 检索相同的索引值,无论idx_atomic 个操作内存标签?

作为参考,我使用的是 MSVC 和 x64。

#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <chrono>

using namespace std::chrono;    // for ""ms operator

const size_t c_size = 40;
std::vector<int> shared_array;
std::atomic<bool> sync_start_atomic = false;
std::atomic<bool> flag_atomic = false;
std::atomic<size_t> idx_atomic = 0;


void thread1_x() {
    bool expected_flag = false;
    size_t temp_idx = 0;
    while (!sync_start_atomic.load(std::memory_order_relaxed));
    for (size_t i = 0; i < (c_size / 2); ++i) {
        while (flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
                expected_flag = false;
        }
        temp_idx = idx_atomic.load(std::memory_order_relaxed);
        idx_atomic.store((temp_idx + 1), std::memory_order_relaxed);
        flag_atomic.store(false, std::memory_order_release);
        shared_array[temp_idx] = i;

    }
}

void thread2_x() {
       bool expected_flag = false;
    size_t temp_idx = 0;
    while (!sync_start_atomic.load(std::memory_order_relaxed));
    for (size_t i = 0; i < (c_size / 2); ++i) {
        while (flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire)) {
            expected_flag = false;
        }
        temp_idx = idx_atomic.load(std::memory_order_relaxed);
        idx_atomic.store((temp_idx + 1), std::memory_order_relaxed);
        flag_atomic.store(false, std::memory_order_release);
        shared_array[temp_idx] = i + 100;

    }
}

void main(){
    shared_array.reserve(c_size);
    shared_array.assign(c_size, 0);
    std::thread tn_1(thread1_x);
    std::thread tn_2(thread2_x);
    std::this_thread::sleep_for(60ms);
    sync_start_atomic.store(true, std::memory_order_relaxed);

    tn_1.join();
    tn_2.join();

    for (size_t i = 0; i < c_size; ++i) {
        std::cout << shared_array[i] << " ";
    }
    std::cout << "\n";
}

示例真实输出:

100, 1, 101, 2, 3, 102, 4, 103, 104, 6, 106, 8, 108, 9, 10, 109, 11, 110, 12, 111, 14, 112, 113 , 16, 17, 18, 115, 19, 116, 117, 118, 119, 0, 0, 0, 0, 0, 0, 0, 0.

示例预期输出:

0, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 1, 2, 114, 3, 115, 4, 5, 6 , 7, 8, 9, 10, 11, 12, 13, 14, 15, 116, 16, 117, 17, 118, 18, 119, 19.

您的示例输出表明两个线程都在同时访问 idx_atomic,这表明您的 flag_atomic 循环存在问题。您正在使用的条件检查是倒退的。 compare_exchange_weak 将 return flag_atomic == expected_flag 比较的结果 - 换句话说,它 returns true 当值被更新时。由于发生这种情况时要退出循环,因此比较应该是

while (!flag_atomic.compare_exchange_weak(expected_flag, true, std::memory_order_acq_rel, std::memory_order_acquire))