在原子上调用“into_inner()”是否考虑了所有宽松的写入?

Does calling `into_inner()` on an atomic take into account all the relaxed writes?

into_inner()return这个例子程序都是轻松写的吗?如果是这样,哪个概念可以保证这一点?

extern crate crossbeam;

use std::sync::atomic::{AtomicUsize, Ordering};

fn main() {
    let thread_count = 10;
    let increments_per_thread = 100000;
    let i = AtomicUsize::new(0);

    crossbeam::scope(|scope| {
        for _ in 0..thread_count {
            scope.spawn(|| {
                for _ in 0..increments_per_thread {
                    i.fetch_add(1, Ordering::Relaxed);
                }
            });
        }
    });

    println!(
        "Result of {}*{} increments: {}",
        thread_count,
        increments_per_thread,
        i.into_inner()
    );
}

(https://play.rust-lang.org/?gist=96f49f8eb31a6788b970cf20ec94f800&version=stable)

我知道 crossbeam 保证所有线程都完成并且由于所有权回到主线程,我也知道不会有未完成的借用,但在我看来,仍然可能有未完成的待处理写入,如果不在 CPU 上,那么在缓存中。

哪个概念保证调用into_inner()时所有的写都完成,所有的缓存都同步回主线程?是否有可能丢失写入?

您可以调用 into_inner(它消耗 AtomicUsize)这一事实意味着该后备存储不再有借用。

每个 fetch_add 都是一个具有 Relaxed 排序的原子,所以一旦线程完成,就不应该有任何改变它的东西(如果是这样,那么交叉梁中就有一个错误) .

the description on into_inner for more info

Does into_inner() return all the relaxed writes in this example program? If so, which concept guarantees this?

保证的不是into_inner,是join

into_inner 保证的是自最后一次并发写入(join 线程,最后 Arc已被 try_unwrap 等丢弃和解包),或者原子从未首先发送到另一个线程。任何一种情况都足以使读取数据无争用。

Crossbeam documentation 明确表示在作用域的末尾使用 join:

This [the thread being guaranteed to terminate] is ensured by having the parent thread join on the child thread before the scope exits.

关于丢失写入:

Which concept guarantees that all writes are finished and all caches are synced back to the main thread when into_inner() is called? Is it possible to lose writes?

various places in the documentation, Rust inherits the C++ memory model for atomics. In C++11 and later, the completion of a thread synchronizes with所述,join相应成功return。这意味着在 join 完成时,加入的线程执行的所有操作必须对调用 join 的线程可见,因此在这种情况下不可能丢失写入。

就原子而言,您可以将 join 视为对原子的获取读取,线程在完成执行之前对其执行释放存储。

我会将这个答案作为对其他两个的潜在补充。

刚才提到的那种不一致,即在最终读取计数器之前是否会丢失一些写入,这里是不可能的。如果可以将对值的写入推迟到使用 into_inner 消费之后,这将是未定义的行为。但是,即使没有使用 into_inner 消耗计数器,甚至没有 crossbeam 范围的帮助,此程序中也没有意外的竞争条件。

让我们编写一个没有 crossbeam 作用域且不使用计数器的新版本程序 (Playground):

let thread_count = 10;
let increments_per_thread = 100000;
let i = Arc::new(AtomicUsize::new(0));
let threads: Vec<_> = (0..thread_count)
    .map(|_| {
        let i = i.clone();
        thread::spawn(move || for _ in 0..increments_per_thread {
            i.fetch_add(1, Ordering::Relaxed);
        })
    })
    .collect();

for t in threads {
    t.join().unwrap();
}

println!(
    "Result of {}*{} increments: {}",
    thread_count,
    increments_per_thread,
    i.load(Ordering::Relaxed)
);

这个版本仍然很好用!为什么?因为在结束线程与其对应的 join 之间建立了 synchronizes-with 关系。因此,正如 中所解释的那样,加入线程执行的所有操作都必须对调用者线程可见。

人们可能还想知道即使是宽松的内存排序约束是否足以保证整个程序按预期运行。这部分由 Rust Nomicon 解决,重点是我的:

Relaxed accesses are the absolute weakest. They can be freely re-ordered and provide no happens-before relationship. Still, relaxed operations are still atomic. That is, they don't count as data accesses and any read-modify-write operations done to them occur atomically. Relaxed operations are appropriate for things that you definitely want to happen, but don't particularly otherwise care about. For instance, incrementing a counter can be safely done by multiple threads using a relaxed fetch_add if you're not using the counter to synchronize any other accesses.

提到的用例正是我们在这里所做的。每个线程都不需要观察递增的计数器来做出决定,但所有操作都是原子的。最后,线程 joins 与主线程同步,因此暗示了一种发生前的关系,并保证操作在那里可见。由于 Rust 采用与 C++11 相同的内存模型(这是由 LLVM 内部实现的),我们可以看到关于 C++ std::thread::join function that "The completion of the thread identified by *this synchronizes with the corresponding successful return". In fact, the very same example in C++ is available in cppreference.com 作为对宽松内存顺序约束的解释的一部分:

#include <vector>
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> cnt = {0};

void f()
{
    for (int n = 0; n < 1000; ++n) {
        cnt.fetch_add(1, std::memory_order_relaxed);
    }
}

int main()
{
    std::vector<std::thread> v;
    for (int n = 0; n < 10; ++n) {
        v.emplace_back(f);
    }
    for (auto& t : v) {
        t.join();
    }
    std::cout << "Final counter value is " << cnt << '\n';
}