如何使用两个 condvars 创建互斥量?
How do I create a mutex with two condvars?
我想在 Rust 中构建一个单生产者多消费者示例,其中生产者的未完成项目不得超过 10 个。我用 C 语言建模了一个解决方案,它使用一个互斥量和两个条件变量。一个 condvar 是在没有东西可消费时等待消费者,一个 condvar 是在未消费的项目数大于 10 时等待生产者。C 代码如下。
据我从 Rust 文档中了解到,std::sync::Mutex
和 std::sync::Condvar
之间必须存在 1-1 连接,因此我无法准确翻译我的 C 解决方案。
在 Rust 中使用 std::sync::Mutex
和 std::sync::Condvar
.
是否有其他方法可以达到相同的目的(我看不到)
#define _GNU_SOURCE
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//
// This is a simple example of using a mutex and 2 condition variables to
// sync a single writer and multiple readers interacting with a bounded (fixed max size) queue
//
// in this toy example a queue is simulated by an int counter n_resource
//
int n_resource;
pthread_cond_t rdr_cvar;
pthread_cond_t wrtr_cvar;
pthread_mutex_t mutex;
void reader(void* data)
{
long id = (long)data;
for(;;) {
pthread_mutex_lock(&mutex);
while (n_resource <= 0) {
pthread_cond_wait(&rdr_cvar, &mutex);
}
printf("Reader %ld n_resource = %d\n", id, n_resource);
--n_resource;
// if there are still things to read - singla one reader
if(n_resource > 0) {
pthread_cond_signal(&rdr_cvar);
}
// if there is space for the writer to add another signal the writer
if(n_resource < 10) {
pthread_cond_signal(&wrtr_cvar);
}
pthread_mutex_unlock(&mutex);
}
}
void writer(void* data)
{
for(;;) {
pthread_mutex_lock(&mutex);
printf("Writer before while n_resource %d \n", n_resource);
while (n_resource > 10) {
pthread_cond_wait(&wrtr_cvar, &mutex);
}
printf("Writer after while n_resource %d \n", n_resource);
++n_resource;
// if there is something for a reader to read signal one of the readers.
if(n_resource > 0) {
pthread_cond_signal(&rdr_cvar);
}
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t rdr_thread_1;
pthread_t rdr_thread_2;
pthread_t wrtr_thread;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&rdr_cvar, NULL);
pthread_cond_init(&wrtr_cvar, NULL);
pthread_create(&rdr_thread_1, NULL, &reader, (void*)1L);
pthread_create(&rdr_thread_2, NULL, &reader, (void*)2L);
pthread_create(&wrtr_thread, NULL, &writer, NULL);
pthread_join(wrtr_thread, NULL);
pthread_join(rdr_thread_1, NULL);
pthread_join(rdr_thread_2, NULL);
}
虽然 CondVar
只需要关联一个 Mutex
,但 Mutex
不必只关联一个 CondVar
.
例如,以下代码似乎工作正常 - 您可以 运行 它在 playground。
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
struct Q {
rdr_cvar: Condvar,
wrtr_cvar: Condvar,
mutex: Mutex<i32>,
}
impl Q {
pub fn new() -> Q {
Q {
rdr_cvar: Condvar::new(),
wrtr_cvar: Condvar::new(),
mutex: Mutex::new(0),
}
}
}
fn writer(id: i32, qq: Arc<Q>) {
let q = &*qq;
for i in 0..10 {
let guard = q.mutex.lock().unwrap();
let mut guard = q.wrtr_cvar.wait_while(guard, |n| *n > 3).unwrap();
println!("{}: Writer {} n_resource = {}\n", i, id, *guard);
*guard += 1;
if *guard > 0 {
q.rdr_cvar.notify_one();
}
if *guard < 10 {
q.wrtr_cvar.notify_one();
}
}
}
fn reader(id: i32, qq: Arc<Q>) {
let q = &*qq;
for i in 0..10 {
let guard = q.mutex.lock().unwrap();
let mut guard = q.rdr_cvar.wait_while(guard, |n| *n <= 0).unwrap();
println!("{} Reader {} n_resource = {}\n", i, id, *guard);
*guard -= 1;
if *guard > 0 {
q.rdr_cvar.notify_one();
}
if *guard < 10 {
q.wrtr_cvar.notify_one();
}
}
}
fn main() {
let data = Arc::new(Q::new());
let data2 = data.clone();
let t1 = thread::spawn(move || writer(0, data2));
let t2 = thread::spawn(move || reader(1, data));
t1.join().unwrap();
t2.join().unwrap();
}
我想在 Rust 中构建一个单生产者多消费者示例,其中生产者的未完成项目不得超过 10 个。我用 C 语言建模了一个解决方案,它使用一个互斥量和两个条件变量。一个 condvar 是在没有东西可消费时等待消费者,一个 condvar 是在未消费的项目数大于 10 时等待生产者。C 代码如下。
据我从 Rust 文档中了解到,std::sync::Mutex
和 std::sync::Condvar
之间必须存在 1-1 连接,因此我无法准确翻译我的 C 解决方案。
在 Rust 中使用 std::sync::Mutex
和 std::sync::Condvar
.
#define _GNU_SOURCE
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//
// This is a simple example of using a mutex and 2 condition variables to
// sync a single writer and multiple readers interacting with a bounded (fixed max size) queue
//
// in this toy example a queue is simulated by an int counter n_resource
//
int n_resource;
pthread_cond_t rdr_cvar;
pthread_cond_t wrtr_cvar;
pthread_mutex_t mutex;
void reader(void* data)
{
long id = (long)data;
for(;;) {
pthread_mutex_lock(&mutex);
while (n_resource <= 0) {
pthread_cond_wait(&rdr_cvar, &mutex);
}
printf("Reader %ld n_resource = %d\n", id, n_resource);
--n_resource;
// if there are still things to read - singla one reader
if(n_resource > 0) {
pthread_cond_signal(&rdr_cvar);
}
// if there is space for the writer to add another signal the writer
if(n_resource < 10) {
pthread_cond_signal(&wrtr_cvar);
}
pthread_mutex_unlock(&mutex);
}
}
void writer(void* data)
{
for(;;) {
pthread_mutex_lock(&mutex);
printf("Writer before while n_resource %d \n", n_resource);
while (n_resource > 10) {
pthread_cond_wait(&wrtr_cvar, &mutex);
}
printf("Writer after while n_resource %d \n", n_resource);
++n_resource;
// if there is something for a reader to read signal one of the readers.
if(n_resource > 0) {
pthread_cond_signal(&rdr_cvar);
}
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t rdr_thread_1;
pthread_t rdr_thread_2;
pthread_t wrtr_thread;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&rdr_cvar, NULL);
pthread_cond_init(&wrtr_cvar, NULL);
pthread_create(&rdr_thread_1, NULL, &reader, (void*)1L);
pthread_create(&rdr_thread_2, NULL, &reader, (void*)2L);
pthread_create(&wrtr_thread, NULL, &writer, NULL);
pthread_join(wrtr_thread, NULL);
pthread_join(rdr_thread_1, NULL);
pthread_join(rdr_thread_2, NULL);
}
虽然 CondVar
只需要关联一个 Mutex
,但 Mutex
不必只关联一个 CondVar
.
例如,以下代码似乎工作正常 - 您可以 运行 它在 playground。
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
struct Q {
rdr_cvar: Condvar,
wrtr_cvar: Condvar,
mutex: Mutex<i32>,
}
impl Q {
pub fn new() -> Q {
Q {
rdr_cvar: Condvar::new(),
wrtr_cvar: Condvar::new(),
mutex: Mutex::new(0),
}
}
}
fn writer(id: i32, qq: Arc<Q>) {
let q = &*qq;
for i in 0..10 {
let guard = q.mutex.lock().unwrap();
let mut guard = q.wrtr_cvar.wait_while(guard, |n| *n > 3).unwrap();
println!("{}: Writer {} n_resource = {}\n", i, id, *guard);
*guard += 1;
if *guard > 0 {
q.rdr_cvar.notify_one();
}
if *guard < 10 {
q.wrtr_cvar.notify_one();
}
}
}
fn reader(id: i32, qq: Arc<Q>) {
let q = &*qq;
for i in 0..10 {
let guard = q.mutex.lock().unwrap();
let mut guard = q.rdr_cvar.wait_while(guard, |n| *n <= 0).unwrap();
println!("{} Reader {} n_resource = {}\n", i, id, *guard);
*guard -= 1;
if *guard > 0 {
q.rdr_cvar.notify_one();
}
if *guard < 10 {
q.wrtr_cvar.notify_one();
}
}
}
fn main() {
let data = Arc::new(Q::new());
let data2 = data.clone();
let t1 = thread::spawn(move || writer(0, data2));
let t2 = thread::spawn(move || reader(1, data));
t1.join().unwrap();
t2.join().unwrap();
}