为什么 Condvar 线程永远不会醒来?
Why does Condvar thread never wake up?
如果我取消注释 do_work
函数中的 sleep
行,此代码将运行完成并打印出 my_data
的值。如果我将其注释掉,我的可执行文件每次都会挂起。
提到收集句柄并等待它们加入主线程,但这应该由人造丝范围处理,对吗?
如果没有 do_work()
中的 sleep
语句,我如何才能完成此代码?
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Barrier, Condvar, Mutex,
},
thread,
time::Duration,
};
fn do_work(
mtx: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
barrier: Arc<Barrier>,
quitting: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
barrier.wait();
//thread::sleep(Duration::from_micros(1));
let mut started = mtx.lock().unwrap();
while !*started && !quitting.load(Ordering::SeqCst) {
started = cond_var.wait(started).unwrap();
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i += 1.0);
}
}
println!("{:?} Joining", thread::current().id());
}
fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
let mut started = mtx.lock().unwrap();
*started = true;
cond_var.notify_all();
}
fn reset_work(mtx: Arc<Mutex<bool>>) {
let mut started = mtx.lock().unwrap();
*started = false;
}
fn main() {
let num_threads = 4;
let test_barrier = Arc::new(Barrier::new(num_threads + 1));
let test_mutex = Arc::new(Mutex::new(false));
let test_cond_var = Arc::new(Condvar::new());
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_mtx = test_mutex.clone();
let thread_cond_var = test_cond_var.clone();
let thread_barrier = Arc::clone(&test_barrier);
let temp = &quitting;
s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
}
test_barrier.wait();
let _upper_bound = 1024 / num_threads;
for _i in 0..10 {
start_work(test_mutex.clone(), test_cond_var.clone());
test_barrier.wait();
reset_work(test_mutex.clone());
}
quitting.store(true, Ordering::SeqCst);
});
println!("my_data is: {:?}", my_data);
}
Cargo.toml 依赖关系:
rayon = "*"
这是对 do_work
稍后将进行的更复杂数学的测试,但我正在尝试获取一系列线程,这些线程成功修改了更大的 Vec
的一部分。
我的预期行为按预期运行。这似乎特别令人费解,并且应该有更好的方法。我很乐意接受一个没有我所拥有的那么复杂的答案,但它至少会产生所需的行为,一个带有预旋转线程的线程池,只要它从主线程接收到适当的信号就会突发工作,并且可以确定地关闭方法。额外的开始和结束线程提供了一种握手机制,以确保不会出现竞争条件。
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Barrier,
},
thread,
};
fn do_work(
start_barrier: &Barrier,
finish_barrier: &Barrier,
quitting: &AtomicBool,
starting: &AtomicBool,
finishing: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
start_barrier.wait();
while !starting.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
{
// let mut started = mtx.lock().unwrap();
// while !*started && !quitting.load(Ordering::SeqCst) {
// started = cond_var.wait(started).unwrap();
// }
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i += 1.0);
}
finish_barrier.wait();
while !finishing.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
}
println!("{:?} Joining", thread::current().id());
}
fn main() {
let num_threads = 4;
let start_barrier = Barrier::new(num_threads + 1);
let finish_barrier = Barrier::new(num_threads + 1);
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
let starting = AtomicBool::new(false);
let finishing = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_start_barrier = &start_barrier;
let thread_finish_barrier = &finish_barrier;
let thread_quitting = &quitting;
let thread_starting = &starting;
let thread_finishing = &finishing;
s.spawn(move |_| do_work( thread_start_barrier,
thread_finish_barrier,
thread_quitting,
thread_starting,
thread_finishing,
chunk));
}
let num_rounds = 10;
for i in 0..num_rounds {
let start = std::time::Instant::now();
start_barrier.wait();
finishing.store(false, Ordering::SeqCst);
starting.store(true, Ordering::SeqCst);
finish_barrier.wait();
if i == num_rounds-1 {
quitting.store(true, Ordering::SeqCst);
}
finishing.store(true, Ordering::SeqCst);
starting.store(false, Ordering::SeqCst);
println!("Round {} took: {:?}", i, std::time::Instant::now() - start);
}
});
println!("my_data is: {:?}", my_data);
}
如果我取消注释 do_work
函数中的 sleep
行,此代码将运行完成并打印出 my_data
的值。如果我将其注释掉,我的可执行文件每次都会挂起。
如果没有 do_work()
中的 sleep
语句,我如何才能完成此代码?
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Barrier, Condvar, Mutex,
},
thread,
time::Duration,
};
fn do_work(
mtx: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
barrier: Arc<Barrier>,
quitting: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
barrier.wait();
//thread::sleep(Duration::from_micros(1));
let mut started = mtx.lock().unwrap();
while !*started && !quitting.load(Ordering::SeqCst) {
started = cond_var.wait(started).unwrap();
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i += 1.0);
}
}
println!("{:?} Joining", thread::current().id());
}
fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
let mut started = mtx.lock().unwrap();
*started = true;
cond_var.notify_all();
}
fn reset_work(mtx: Arc<Mutex<bool>>) {
let mut started = mtx.lock().unwrap();
*started = false;
}
fn main() {
let num_threads = 4;
let test_barrier = Arc::new(Barrier::new(num_threads + 1));
let test_mutex = Arc::new(Mutex::new(false));
let test_cond_var = Arc::new(Condvar::new());
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_mtx = test_mutex.clone();
let thread_cond_var = test_cond_var.clone();
let thread_barrier = Arc::clone(&test_barrier);
let temp = &quitting;
s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
}
test_barrier.wait();
let _upper_bound = 1024 / num_threads;
for _i in 0..10 {
start_work(test_mutex.clone(), test_cond_var.clone());
test_barrier.wait();
reset_work(test_mutex.clone());
}
quitting.store(true, Ordering::SeqCst);
});
println!("my_data is: {:?}", my_data);
}
Cargo.toml 依赖关系:
rayon = "*"
这是对 do_work
稍后将进行的更复杂数学的测试,但我正在尝试获取一系列线程,这些线程成功修改了更大的 Vec
的一部分。
我的预期行为按预期运行。这似乎特别令人费解,并且应该有更好的方法。我很乐意接受一个没有我所拥有的那么复杂的答案,但它至少会产生所需的行为,一个带有预旋转线程的线程池,只要它从主线程接收到适当的信号就会突发工作,并且可以确定地关闭方法。额外的开始和结束线程提供了一种握手机制,以确保不会出现竞争条件。
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Barrier,
},
thread,
};
fn do_work(
start_barrier: &Barrier,
finish_barrier: &Barrier,
quitting: &AtomicBool,
starting: &AtomicBool,
finishing: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
start_barrier.wait();
while !starting.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
{
// let mut started = mtx.lock().unwrap();
// while !*started && !quitting.load(Ordering::SeqCst) {
// started = cond_var.wait(started).unwrap();
// }
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i += 1.0);
}
finish_barrier.wait();
while !finishing.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
}
println!("{:?} Joining", thread::current().id());
}
fn main() {
let num_threads = 4;
let start_barrier = Barrier::new(num_threads + 1);
let finish_barrier = Barrier::new(num_threads + 1);
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
let starting = AtomicBool::new(false);
let finishing = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_start_barrier = &start_barrier;
let thread_finish_barrier = &finish_barrier;
let thread_quitting = &quitting;
let thread_starting = &starting;
let thread_finishing = &finishing;
s.spawn(move |_| do_work( thread_start_barrier,
thread_finish_barrier,
thread_quitting,
thread_starting,
thread_finishing,
chunk));
}
let num_rounds = 10;
for i in 0..num_rounds {
let start = std::time::Instant::now();
start_barrier.wait();
finishing.store(false, Ordering::SeqCst);
starting.store(true, Ordering::SeqCst);
finish_barrier.wait();
if i == num_rounds-1 {
quitting.store(true, Ordering::SeqCst);
}
finishing.store(true, Ordering::SeqCst);
starting.store(false, Ordering::SeqCst);
println!("Round {} took: {:?}", i, std::time::Instant::now() - start);
}
});
println!("my_data is: {:?}", my_data);
}