在 Rust 中使用 Mutex 和 Condvar 进行缓冲
Buffer in Rust with Mutex and Condvar
我正在尝试实现一个只有一个消费者和一个生产者的缓冲区。我只使用了 POSIX 信号量,但是,它们在 Rust 中不可用,我正在尝试使用 Rust 同步原语(Mutex
、Condvar
、Barrier
, ...) 但我不想使用频道。
我的代码表现得太不规则了,有些情况下运行良好,有些情况下它只是停在某个数字上,而在其他情况下它只是不开始计数。
如果我在主线程中等待 1 秒直到发送 Condvar
通知,事情似乎会更好,但它不能保证它不会进入死锁。
如何修复这个程序?我理解 Condvar
错了吗?
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
struct Buffer {
is_data: Mutex<bool>,
is_data_cv: Condvar,
is_space: Mutex<bool>,
is_space_cv: Condvar,
buffer: Mutex<i32>,
}
fn producer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_space = buffer
.is_space_cv
.wait(buffer.is_space.lock().unwrap())
.unwrap();
if *is_space {
{
let mut hueco = buffer.buffer.lock().unwrap();
*hueco = i;
}
*is_space = false;
{
let mut is_data = buffer.is_data.lock().unwrap();
*is_data = true;
}
buffer.is_data_cv.notify_one();
break;
}
}
}
}
fn consumer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_data = buffer
.is_data_cv
.wait(buffer.is_data.lock().unwrap())
.unwrap();
if *is_data {
{
let hueco = buffer.buffer.lock().unwrap();
println!("{}", *hueco);
}
*is_data = false;
{
let mut is_space = buffer.is_space.lock().unwrap();
*is_space = true;
}
buffer.is_space_cv.notify_one();
break;
}
}
}
}
fn main() {
let buffer = Arc::new(Buffer {
is_data: Mutex::new(false),
is_data_cv: Condvar::new(),
is_space: Mutex::new(true),
is_space_cv: Condvar::new(),
buffer: Mutex::new(0),
});
let b = buffer.clone();
let p = thread::spawn(move || {
producer(b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(b);
});
//thread::sleep_ms(1000);
buffer.is_space_cv.notify_one();
c.join();
}
我鼓励您创建更小的方法并重用现有的 Rust 类型,例如 Option
。这将使您的代码大大简化 — 只有一个 Mutex
和一个 Condvar
:
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
data_cv: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.data_cv.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.data_cv.notify_one();
val
}
}
fn producer(buffer: &Buffer) {
for i in 0..50 {
println!("p: {}", i);
buffer.insert(i);
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..50 {
let val = buffer.remove();
println!("c: {}", val);
}
}
fn main() {
let buffer = Arc::new(Buffer::default());
let b = buffer.clone();
let p = thread::spawn(move || {
producer(&b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(&b);
});
c.join().expect("Consumer had an error");
p.join().expect("Producer had an error");
}
如果您想获得更高的性能(通过基准测试看是否值得),您可以分别为 "empty" 和 "full" 条件设置 Condvar
s:
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
is_empty: Condvar,
is_full: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.is_empty.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.is_full.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.is_full.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.is_empty.notify_one();
val
}
}
为了提高并发性能,您可以在缓冲区中添加更多slots。以下示例还支持多个生产者和消费者。
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread;
const MAX: usize = 10;
struct Buffer {
inner: Mutex<BufferInner>,
fill_cond: Condvar,
empty_cond: Condvar,
}
impl Buffer {
fn new() -> Self {
Buffer {
inner: Mutex::new(BufferInner {
data: [Option::None; MAX],
filled: 0,
used: 0,
count: 0,
}),
fill_cond: Condvar::new(),
empty_cond: Condvar::new(),
}
}
}
struct BufferInner {
data: [Option<i32>; MAX],
filled: usize,
used: usize,
count: usize,
}
impl BufferInner {
fn put(&mut self, value: i32) {
self.data[self.filled] = Some(value);
self.filled = (self.filled + 1) % MAX;
self.count += 1;
}
fn get(&mut self) -> i32 {
let tmp: Option<i32> = self.data[self.used];
self.used = (self.used + 1) % MAX;
self.count -= 1;
tmp.unwrap()
}
}
fn producer(buffer: &Buffer) {
for i in 0..20 {
let mut guard = buffer.inner.lock().unwrap();
while guard.count == MAX {
guard = buffer.empty_cond.wait(guard).unwrap();
}
guard.put(i);
println!("producer: {}", i);
buffer.fill_cond.notify_one();
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..20 {
let mut guard: MutexGuard<BufferInner> = buffer.inner.lock().unwrap();
while guard.count == 0_usize {
guard = buffer.fill_cond.wait(guard).unwrap();
}
let value = guard.get();
println!("consumer: {}", value);
buffer.empty_cond.notify_one();
}
}
fn main() {
let buffer = Arc::new(Buffer::new());
let buffer1 = Arc::clone(&buffer);
let p1 = thread::spawn(move || producer(&buffer));
let c1 = thread::spawn(move || consumer(&buffer1));
p1.join().unwrap();
c1.join().unwrap();
}
我正在尝试实现一个只有一个消费者和一个生产者的缓冲区。我只使用了 POSIX 信号量,但是,它们在 Rust 中不可用,我正在尝试使用 Rust 同步原语(Mutex
、Condvar
、Barrier
, ...) 但我不想使用频道。
我的代码表现得太不规则了,有些情况下运行良好,有些情况下它只是停在某个数字上,而在其他情况下它只是不开始计数。
如果我在主线程中等待 1 秒直到发送 Condvar
通知,事情似乎会更好,但它不能保证它不会进入死锁。
如何修复这个程序?我理解 Condvar
错了吗?
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
struct Buffer {
is_data: Mutex<bool>,
is_data_cv: Condvar,
is_space: Mutex<bool>,
is_space_cv: Condvar,
buffer: Mutex<i32>,
}
fn producer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_space = buffer
.is_space_cv
.wait(buffer.is_space.lock().unwrap())
.unwrap();
if *is_space {
{
let mut hueco = buffer.buffer.lock().unwrap();
*hueco = i;
}
*is_space = false;
{
let mut is_data = buffer.is_data.lock().unwrap();
*is_data = true;
}
buffer.is_data_cv.notify_one();
break;
}
}
}
}
fn consumer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_data = buffer
.is_data_cv
.wait(buffer.is_data.lock().unwrap())
.unwrap();
if *is_data {
{
let hueco = buffer.buffer.lock().unwrap();
println!("{}", *hueco);
}
*is_data = false;
{
let mut is_space = buffer.is_space.lock().unwrap();
*is_space = true;
}
buffer.is_space_cv.notify_one();
break;
}
}
}
}
fn main() {
let buffer = Arc::new(Buffer {
is_data: Mutex::new(false),
is_data_cv: Condvar::new(),
is_space: Mutex::new(true),
is_space_cv: Condvar::new(),
buffer: Mutex::new(0),
});
let b = buffer.clone();
let p = thread::spawn(move || {
producer(b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(b);
});
//thread::sleep_ms(1000);
buffer.is_space_cv.notify_one();
c.join();
}
我鼓励您创建更小的方法并重用现有的 Rust 类型,例如 Option
。这将使您的代码大大简化 — 只有一个 Mutex
和一个 Condvar
:
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
data_cv: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.data_cv.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.data_cv.notify_one();
val
}
}
fn producer(buffer: &Buffer) {
for i in 0..50 {
println!("p: {}", i);
buffer.insert(i);
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..50 {
let val = buffer.remove();
println!("c: {}", val);
}
}
fn main() {
let buffer = Arc::new(Buffer::default());
let b = buffer.clone();
let p = thread::spawn(move || {
producer(&b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(&b);
});
c.join().expect("Consumer had an error");
p.join().expect("Producer had an error");
}
如果您想获得更高的性能(通过基准测试看是否值得),您可以分别为 "empty" 和 "full" 条件设置 Condvar
s:
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
is_empty: Condvar,
is_full: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.is_empty.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.is_full.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.is_full.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.is_empty.notify_one();
val
}
}
为了提高并发性能,您可以在缓冲区中添加更多slots。以下示例还支持多个生产者和消费者。
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread;
const MAX: usize = 10;
struct Buffer {
inner: Mutex<BufferInner>,
fill_cond: Condvar,
empty_cond: Condvar,
}
impl Buffer {
fn new() -> Self {
Buffer {
inner: Mutex::new(BufferInner {
data: [Option::None; MAX],
filled: 0,
used: 0,
count: 0,
}),
fill_cond: Condvar::new(),
empty_cond: Condvar::new(),
}
}
}
struct BufferInner {
data: [Option<i32>; MAX],
filled: usize,
used: usize,
count: usize,
}
impl BufferInner {
fn put(&mut self, value: i32) {
self.data[self.filled] = Some(value);
self.filled = (self.filled + 1) % MAX;
self.count += 1;
}
fn get(&mut self) -> i32 {
let tmp: Option<i32> = self.data[self.used];
self.used = (self.used + 1) % MAX;
self.count -= 1;
tmp.unwrap()
}
}
fn producer(buffer: &Buffer) {
for i in 0..20 {
let mut guard = buffer.inner.lock().unwrap();
while guard.count == MAX {
guard = buffer.empty_cond.wait(guard).unwrap();
}
guard.put(i);
println!("producer: {}", i);
buffer.fill_cond.notify_one();
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..20 {
let mut guard: MutexGuard<BufferInner> = buffer.inner.lock().unwrap();
while guard.count == 0_usize {
guard = buffer.fill_cond.wait(guard).unwrap();
}
let value = guard.get();
println!("consumer: {}", value);
buffer.empty_cond.notify_one();
}
}
fn main() {
let buffer = Arc::new(Buffer::new());
let buffer1 = Arc::clone(&buffer);
let p1 = thread::spawn(move || producer(&buffer));
let c1 = thread::spawn(move || consumer(&buffer1));
p1.join().unwrap();
c1.join().unwrap();
}