为什么 Rust 互斥锁似乎没有给最后想要锁定它的线程锁?

Why do Rust mutexes not seem to give the lock to the thread that wanted to lock it last?

我想编写一个程序来产生两个锁定 Mutex 的线程,增加它,打印一些东西,然后解锁 Mutex 这样另一个线程可以做同样的事情。我添加了一些睡眠时间以使其更加一致,所以我认为输出应该类似于:

ping pong ping pong …  

但实际输出是相当随机的。大多数时候,它只是

ping ping ping … pong 

但是根本没有一致性;有时中间也有一个“乒乓球”。

我相信互斥体有某种方法可以确定谁想最后锁定它,但事实并非如此。

  1. 锁定实际上是如何工作的?
  2. 如何获得所需的输出?
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();
    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        thread::sleep(ten_millis);
        println!("ping ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        thread::sleep(ten_millis);
        println!("pong ");
        *data += 1;
        if *data > 10 {
            break;
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}

不以任何方式保证锁定互斥量的顺序;第一个线程有可能 100% 的时间获取锁,而第二个线程 0%

线程由 OS 调度,以下情况很可能发生:

  1. OS 给第一个线程 CPU 时间,它获得锁
  2. OS 给第二个线程 CPU 时间,但锁被占用,因此它进入睡眠状态
  3. 第一个线程释放了锁,但仍然被 OS 允许 运行。它用于循环的另一次迭代和re-acquires锁
  4. 另一个线程无法继续,因为锁仍然被占用。

如果你给第二个线程更多的时间来获取锁你会看到预期的 ping-pong 模式,虽然没有保证(一个坏的 OS 可能决定永远不给 CPU 时间到你的一些线程):

use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || loop {
        let mut data = data1.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("ping ");
    });

    let b = thread::spawn(move || loop {
        let mut data = data2.lock().unwrap();
        *data += 1;
        if *data > 10 {
            break;
        }

        drop(data);
        thread::sleep(ten_millis);
        println!("pong ");
    });

    a.join().unwrap();
    b.join().unwrap();
}

您可以通过播放睡眠时间来验证这一点。睡眠时间越短,ping-pong 交替就会越不规则,低至 10ms 的值,您可能会看到 ping-ping-pong,等等

本质上,基于时间的解决方案在设计上是糟糕的。你可以通过改进算法保证 "ping" 后面跟着 "pong"。例如,您可以在奇数上打印 "ping",在偶数上打印 "pong":

use std::sync::{Arc, Mutex};
use std::{thread, time};

const MAX_ITER: i32 = 10;

fn main() {
    let data1 = Arc::new(Mutex::new(1));
    let data2 = data1.clone();

    let ten_millis = time::Duration::from_millis(10);

    let a = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data1.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 1 {
                *data += 1;
                println!("ping ");
                break;
            }
        }
    });

    let b = thread::spawn(move || 'outer: loop {
        loop {
            thread::sleep(ten_millis);
            let mut data = data2.lock().unwrap();
            if *data > MAX_ITER {
                break 'outer;
            }

            if *data & 1 == 0 {
                *data += 1;
                println!("pong ");
                break;
            }
        }
    });

    a.join().unwrap();
    b.join().unwrap();
}

这不是最好的实现方式,但我尝试对原始代码进行尽可能少的修改。

您也可以考虑使用 Condvar 的实现,在我看来,这是一个更好的解决方案,因为它避免了在互斥量上的忙碌等待(ps:还删除了代码重复):

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

const MAX_ITER: i32 = 10;

fn main() {
    let cv1 = Arc::new((Condvar::new(), Mutex::new(1)));
    let cv2 = cv1.clone();

    let a = thread::spawn(ping_pong_task("ping", cv1, |x| x & 1 == 1));
    let b = thread::spawn(ping_pong_task("pong", cv2, |x| x & 1 == 0));

    a.join().unwrap();
    b.join().unwrap();
}

fn ping_pong_task<S: Into<String>>(
        msg: S, 
        cv: Arc<(Condvar, Mutex<i32>)>, 
        check: impl Fn(i32) -> bool) -> impl Fn() 
{
    let message = msg.into();

    move || {
        let (condvar, mutex) = &*cv;

        let mut value = mutex.lock().unwrap();
        loop {
            if check(*value) {
                println!("{} ", message);
                *value += 1;
                condvar.notify_all();
            }

            if *value > MAX_ITER {
                break;
            }

            value = condvar.wait(value).unwrap();
        }
    }
}

MutexRwLock 都遵循 OS-specific 原语,不能保证公平。在 Windows 上,它们都使用 SRW locks 实现,具体记录为 not fair。我没有对其他操作系统进行研究,但你绝对不能依赖 std::sync::Mutex 的公平性,尤其是当你需要此代码可移植时。

Rust 中的一个可能的解决方案是 Mutex implementation provided by the parking_lot crate, which provides an unlock_fair method,它是用一个公平的算法实现的。

来自parking_lot documentation:

By default, mutexes are unfair and allow the current thread to re-lock the mutex before another has the chance to acquire the lock, even if that thread has been blocked on the mutex for a long time. This is the default because it allows much higher throughput as it avoids forcing a context switch on every mutex unlock. This can result in one thread acquiring a mutex many more times than other threads.

However in some cases it can be beneficial to ensure fairness by forcing the lock to pass on to a waiting thread if there is one. This is done by using this method instead of dropping the MutexGuard normally.

虽然 parking_lot::Mutex 在没有专门使用 unlock_fair 方法的情况下并不声称是公平的,但我发现您的代码产生的 ping 数量与 pong 数量相同,只需进行切换(playground), 甚至不使用 unlock_fair 方法。

通常,当守卫超出范围时,互斥锁会自动解锁。为了让它公平解锁,你需要在守卫被删除之前插入这个方法调用:

let b = thread::spawn(move || loop {
    let mut data = data1.lock();
    thread::sleep(ten_millis);
    println!("pong ");
    *data += 1;
    if *data > 10 {
        break;
    }
    MutexGuard::unlock_fair(data);
});

I was of the belief that mutexes had some kind of way to determine who wanted to lock it last but it doesn’t look like that’s the case.

没有。互斥量的工作只是使代码 运行 尽可能快。交替会产生最差的性能,因为您不断地耗尽 CPU 缓存。你要求的是最糟糕的互斥实现。

How does the locking actually work?

调度程序尝试完成尽可能多的工作。您的工作是编写仅完成您真正想要完成的工作的代码。

How can I get the desired output?

如果您只想做一件事然后再做另一件事,那么不要使用两个线程。当您不关心工作完成的顺序并且只想完成尽可能多的工作时,请使用线程。