在 Rust 中使用 Mutex 和 Condvar 进行缓冲

Buffer in Rust with Mutex and Condvar

我正在尝试实现一个只有一个消费者和一个生产者的缓冲区。我只使用了 POSIX 信号量,但是,它们在 Rust 中不可用,我正在尝试使用 Rust 同步原语(MutexCondvarBarrier, ...) 但我不想使用频道。

我的代码表现得太不规则了,有些情况下运行良好,有些情况下它只是停在某个数字上,而在其他情况下它只是不开始计数。

如果我在主线程中等待 1 秒直到发送 Condvar 通知,事情似乎会更好,但它不能保证它不会进入死锁。

如何修复这个程序?我理解 Condvar 错了吗?

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

struct Buffer {
    is_data: Mutex<bool>,
    is_data_cv: Condvar,
    is_space: Mutex<bool>,
    is_space_cv: Condvar,
    buffer: Mutex<i32>,
}

fn producer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_space = buffer
                .is_space_cv
                .wait(buffer.is_space.lock().unwrap())
                .unwrap();
            if *is_space {
                {
                    let mut hueco = buffer.buffer.lock().unwrap();
                    *hueco = i;
                }

                *is_space = false;
                {
                    let mut is_data = buffer.is_data.lock().unwrap();
                    *is_data = true;
                }
                buffer.is_data_cv.notify_one();
                break;
            }
        }
    }
}

fn consumer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_data = buffer
                .is_data_cv
                .wait(buffer.is_data.lock().unwrap())
                .unwrap();
            if *is_data {
                {
                    let hueco = buffer.buffer.lock().unwrap();
                    println!("{}", *hueco);
                }
                *is_data = false;
                {
                    let mut is_space = buffer.is_space.lock().unwrap();
                    *is_space = true;
                }
                buffer.is_space_cv.notify_one();
                break;
            }
        }
    }
}

fn main() {
    let buffer = Arc::new(Buffer {
        is_data: Mutex::new(false),
        is_data_cv: Condvar::new(),
        is_space: Mutex::new(true),
        is_space_cv: Condvar::new(),
        buffer: Mutex::new(0),
    });
    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(b);
    });
    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(b);
    });

    //thread::sleep_ms(1000);

    buffer.is_space_cv.notify_one();
    c.join();
}

我鼓励您创建更小的方法并重用现有的 Rust 类型,例如 Option。这将使您的代码大大简化 — 只有一个 Mutex 和一个 Condvar:

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

#[derive(Debug, Default)]
struct Buffer {
    data: Mutex<Option<i32>>,
    data_cv: Condvar,
}

impl Buffer {
    fn insert(&self, val: i32) {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_some() {
            lock = self.data_cv.wait(lock).expect("Can't wait");
        }
        *lock = Some(val);
        self.data_cv.notify_one();
    }

    fn remove(&self) -> i32 {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_none() {
            lock = self.data_cv.wait(lock).expect("Can't wait");
        }
        let val = lock.take().unwrap();
        self.data_cv.notify_one();
        val
    }
}

fn producer(buffer: &Buffer) {
    for i in 0..50 {
        println!("p: {}", i);
        buffer.insert(i);
    }
}

fn consumer(buffer: &Buffer) {
    for _ in 0..50 {
        let val = buffer.remove();
        println!("c: {}", val);
    }
}

fn main() {
    let buffer = Arc::new(Buffer::default());

    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(&b);
    });

    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(&b);
    });

    c.join().expect("Consumer had an error");
    p.join().expect("Producer had an error");
}

如果您想获得更高的性能(通过基准测试看是否值得),您可以分别为 "empty" 和 "full" 条件设置 Condvars:

#[derive(Debug, Default)]
struct Buffer {
    data: Mutex<Option<i32>>,
    is_empty: Condvar,
    is_full: Condvar,
}

impl Buffer {
    fn insert(&self, val: i32) {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_some() {
            lock = self.is_empty.wait(lock).expect("Can't wait");
        }
        *lock = Some(val);
        self.is_full.notify_one();
    }

    fn remove(&self) -> i32 {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_none() {
            lock = self.is_full.wait(lock).expect("Can't wait");
        }
        let val = lock.take().unwrap();
        self.is_empty.notify_one();
        val
    }
}

为了提高并发性能,您可以在缓冲区中添加更多slots。以下示例还支持多个生产者和消费者。

use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread;

const MAX: usize = 10;

struct Buffer {
    inner: Mutex<BufferInner>,
    fill_cond: Condvar,
    empty_cond: Condvar,
}

impl Buffer {
    fn new() -> Self {
        Buffer {
            inner: Mutex::new(BufferInner {
                data: [Option::None; MAX],
                filled: 0,
                used: 0,
                count: 0,
            }),
            fill_cond: Condvar::new(),
            empty_cond: Condvar::new(),
        }
    }
}

struct BufferInner {
    data: [Option<i32>; MAX],
    filled: usize,
    used: usize,
    count: usize,
}

impl BufferInner {
    fn put(&mut self, value: i32) {
        self.data[self.filled] = Some(value);
        self.filled = (self.filled + 1) % MAX;
        self.count += 1;
    }

    fn get(&mut self) -> i32 {
        let tmp: Option<i32> = self.data[self.used];
        self.used = (self.used + 1) % MAX;
        self.count -= 1;
        tmp.unwrap()
    }
}

fn producer(buffer: &Buffer) {
    for i in 0..20 {
        let mut guard = buffer.inner.lock().unwrap();
        while guard.count == MAX {
            guard = buffer.empty_cond.wait(guard).unwrap();
        }

        guard.put(i);
        println!("producer: {}", i);
        buffer.fill_cond.notify_one();
    }
}

fn consumer(buffer: &Buffer) {
    for _ in 0..20 {
        let mut guard: MutexGuard<BufferInner> = buffer.inner.lock().unwrap();
        while guard.count == 0_usize {
            guard = buffer.fill_cond.wait(guard).unwrap();
        }

        let value = guard.get();
        println!("consumer: {}", value);
        buffer.empty_cond.notify_one();
    }
}

fn main() {
    let buffer = Arc::new(Buffer::new());
    let buffer1 = Arc::clone(&buffer);

    let p1 = thread::spawn(move || producer(&buffer));
    let c1 = thread::spawn(move || consumer(&buffer1));

    p1.join().unwrap();
    c1.join().unwrap();
}