如何遍历线程句柄并在完成后加入另一个循环?
How to loop over thread handles and join if finished, within another loop?
我有一个在循环中创建线程的程序,还检查它们是否已完成并在完成时清理它们。请参阅下面的最小示例:
use std::thread;
fn main() {
let mut v = Vec::<std::thread::JoinHandle<()>>::new();
for _ in 0..10 {
let jh = thread::spawn(|| {
thread::sleep(std::time::Duration::from_secs(1));
});
v.push(jh);
for jh in v.iter_mut() {
if jh.is_finished() {
jh.join().unwrap();
}
}
}
}
这给出了错误:
error[E0507]: cannot move out of `*jh` which is behind a mutable reference
--> src\main.rs:13:17
|
13 | jh.join().unwrap();
| ^^^------
| | |
| | `*jh` moved due to this method call
| move occurs because `*jh` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: this function takes ownership of the receiver `self`, which moves `*jh`
--> D:\rust\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:1461:17
|
1461 | pub fn join(self) -> Result<T> {
我怎样才能让借阅检查器允许这样做?
JoinHandle::join
实际上 消耗 JoinHandle。
iter_mut()
,但是,只是借用了vector的元素,让vector保持存活。因此你的JoinHandle
只是借来的,你不能调用借来的对象的消费方法。
您需要做的是在遍历向量时获取元素的所有权,以便 join()
可以使用它们。这是通过使用 into_iter()
而不是 iter_mut()
.
来实现的
第二个错误是您(可能是不小心)将两个 for
循环写在彼此内部,而它们应该是独立的循环。
第三个问题稍微复杂一点。您不能 检查 线程是否已完成,然后按照您的方式加入它。因此,我暂时删除了 is_finished()
检查,以后会再讨论这个问题。
这是您的固定代码:
use std::thread;
fn main() {
let mut v = Vec::<std::thread::JoinHandle<()>>::new();
for _ in 0..10 {
let jh = thread::spawn(|| {
thread::sleep(std::time::Duration::from_secs(1));
});
v.push(jh);
}
for jh in v.into_iter() {
jh.join().unwrap();
}
}
对完成的线程作出反应
这个比较难。如果你只是想等到 所有 完成,上面的代码就是方法。
但是,如果您立即对完成的线程作出反应,您基本上必须设置某种事件传播。您不想一遍又一遍地遍历所有线程,直到它们全部完成,因为这称为 idle-waiting 并且会消耗大量计算能力。
所以如果你想实现这个,有两个问题需要解决:
join()
消耗了 JoinHandle()
,这将留下 JoinHandle
的不完整 Vec
。这是不可能的,所以我们需要将 JoinHandle
包装在一个实际上可以部分从向量中剥离出来的类型中,比如 Option
.
- 我们需要一种方法来向主线程发出新子线程已完成的信号,这样主线程就不必不断地迭代线程。
总而言之,这非常复杂且难以实施。
这是我的尝试:
use std::{
thread::{self, JoinHandle},
time::Duration,
};
fn main() {
let mut v: Vec<Option<JoinHandle<()>>> = Vec::new();
let (send_finished_thread, receive_finished_thread) = std::sync::mpsc::channel();
for i in 0..10 {
let send_finished_thread = send_finished_thread.clone();
let join_handle = thread::spawn(move || {
println!("Thread {} started.", i);
thread::sleep(Duration::from_millis(2000 - i as u64 * 100));
println!("Thread {} finished.", i);
// Signal that we are finished.
// This will wake up the main thread.
send_finished_thread.send(i).unwrap();
});
v.push(Some(join_handle));
}
loop {
// Check if all threads are finished
let num_left = v.iter().filter(|th| th.is_some()).count();
if num_left == 0 {
break;
}
// Wait until a thread is finished, then join it
let i = receive_finished_thread.recv().unwrap();
let join_handle = std::mem::take(&mut v[i]).unwrap();
println!("Joining {} ...", i);
join_handle.join().unwrap();
println!("{} joined.", i);
}
println!("All joined.");
}
重要
这段代码只是一个演示。它 将死锁 如果其中一个线程出现恐慌。但这表明这个问题有多么复杂。
它可以通过使用防坠落装置来解决,但我认为这个答案已经足够令人费解了;)
我有一个在循环中创建线程的程序,还检查它们是否已完成并在完成时清理它们。请参阅下面的最小示例:
use std::thread;
fn main() {
let mut v = Vec::<std::thread::JoinHandle<()>>::new();
for _ in 0..10 {
let jh = thread::spawn(|| {
thread::sleep(std::time::Duration::from_secs(1));
});
v.push(jh);
for jh in v.iter_mut() {
if jh.is_finished() {
jh.join().unwrap();
}
}
}
}
这给出了错误:
error[E0507]: cannot move out of `*jh` which is behind a mutable reference
--> src\main.rs:13:17
|
13 | jh.join().unwrap();
| ^^^------
| | |
| | `*jh` moved due to this method call
| move occurs because `*jh` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: this function takes ownership of the receiver `self`, which moves `*jh`
--> D:\rust\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:1461:17
|
1461 | pub fn join(self) -> Result<T> {
我怎样才能让借阅检查器允许这样做?
JoinHandle::join
实际上 消耗 JoinHandle。
iter_mut()
,但是,只是借用了vector的元素,让vector保持存活。因此你的JoinHandle
只是借来的,你不能调用借来的对象的消费方法。
您需要做的是在遍历向量时获取元素的所有权,以便 join()
可以使用它们。这是通过使用 into_iter()
而不是 iter_mut()
.
第二个错误是您(可能是不小心)将两个 for
循环写在彼此内部,而它们应该是独立的循环。
第三个问题稍微复杂一点。您不能 检查 线程是否已完成,然后按照您的方式加入它。因此,我暂时删除了 is_finished()
检查,以后会再讨论这个问题。
这是您的固定代码:
use std::thread;
fn main() {
let mut v = Vec::<std::thread::JoinHandle<()>>::new();
for _ in 0..10 {
let jh = thread::spawn(|| {
thread::sleep(std::time::Duration::from_secs(1));
});
v.push(jh);
}
for jh in v.into_iter() {
jh.join().unwrap();
}
}
对完成的线程作出反应
这个比较难。如果你只是想等到 所有 完成,上面的代码就是方法。
但是,如果您立即对完成的线程作出反应,您基本上必须设置某种事件传播。您不想一遍又一遍地遍历所有线程,直到它们全部完成,因为这称为 idle-waiting 并且会消耗大量计算能力。
所以如果你想实现这个,有两个问题需要解决:
join()
消耗了JoinHandle()
,这将留下JoinHandle
的不完整Vec
。这是不可能的,所以我们需要将JoinHandle
包装在一个实际上可以部分从向量中剥离出来的类型中,比如Option
.- 我们需要一种方法来向主线程发出新子线程已完成的信号,这样主线程就不必不断地迭代线程。
总而言之,这非常复杂且难以实施。
这是我的尝试:
use std::{
thread::{self, JoinHandle},
time::Duration,
};
fn main() {
let mut v: Vec<Option<JoinHandle<()>>> = Vec::new();
let (send_finished_thread, receive_finished_thread) = std::sync::mpsc::channel();
for i in 0..10 {
let send_finished_thread = send_finished_thread.clone();
let join_handle = thread::spawn(move || {
println!("Thread {} started.", i);
thread::sleep(Duration::from_millis(2000 - i as u64 * 100));
println!("Thread {} finished.", i);
// Signal that we are finished.
// This will wake up the main thread.
send_finished_thread.send(i).unwrap();
});
v.push(Some(join_handle));
}
loop {
// Check if all threads are finished
let num_left = v.iter().filter(|th| th.is_some()).count();
if num_left == 0 {
break;
}
// Wait until a thread is finished, then join it
let i = receive_finished_thread.recv().unwrap();
let join_handle = std::mem::take(&mut v[i]).unwrap();
println!("Joining {} ...", i);
join_handle.join().unwrap();
println!("{} joined.", i);
}
println!("All joined.");
}
重要
这段代码只是一个演示。它 将死锁 如果其中一个线程出现恐慌。但这表明这个问题有多么复杂。
它可以通过使用防坠落装置来解决,但我认为这个答案已经足够令人费解了;)