使用 crossbeam 通道和作用域线程的多线程代码无休止地卡住
Multi-threaded code that uses crossbeam channel and scoped threads stuck endlessly
我有以下代码使用容量 (20) 小于我想通过交叉束通道发送的数据总量 (32) 的有界通道。我的目标是使用多个发送线程 (8) 和一定数量 (4) 的数字,每个线程通过交叉束通道发送给单个接收者,并使所有这些并行发生以优化效率。这只是我试图解决的一个更大问题的一个小原型。但是,我的代码导致程序无休止地卡住,永远不会 exit/timeout。我也知道它为什么会发生 - r.iter()
阻塞直到发件人被丢弃,这只发生在范围之外。我尝试了各种无效的方法:
用每个线程克隆发件人然后删除它们(如您在评论中所见)
接收器代码在范围之外,但这只是让最终向量包含 20 个长度而不是所需的 32 个长度。
fn main() {
use crossbeam_channel::{unbounded, bounded};
use crossbeam_utils::thread::scope;
use itertools::Itertools;
let (s, r) = bounded(20);
let mut v = vec![];
scope(|scope| {
scope.spawn(|_| {
for data in r.iter() {
v.push(data);
}
});
let _sender_threads = (0..8)
.into_iter()
.map(|_| {
scope.spawn(|_| {
// let s_clone = s.clone();
for i in 0..4 {
// std::thread::sleep(std::time::Duration::from_millis(100));
match s.send(i) {
Ok(_) => {
// println!("sent i {:?}", i);
()
},
Err(_)=> {
// println!("error sending i {:?}", i);
()
}
};
}
// drop(s_clone);
})
})
.collect_vec();
}).expect("scope error.");
drop(s);
println!("{:?}", v);
}
发生这种情况是因为 s
在范围结束之前不会被删除,但是在所有线程退出之前范围不会结束,并且调用 r.iter()
的线程直到s
被丢弃。这是典型的死锁场景。
您需要将 s
放入范围内,但您只能在发送方线程全部退出后才能这样做,因此您不能像目前这样 drop(s);
在范围内写了。
解决此问题的最简单方法是为每个发送方线程克隆 s
并将克隆移动到线程的闭包中,然后将 s
放入主范围。
fn main() {
use crossbeam_channel::{unbounded, bounded};
use crossbeam_utils::thread::scope;
use itertools::Itertools;
let (s, r) = bounded(20);
let mut v = vec![];
scope(|scope| {
scope.spawn(|_| {
for data in r.iter() {
v.push(data);
}
});
let _sender_threads = (0..8)
.into_iter()
.map(|_| {
// ** Clone the sender, move it into the thread:
let s = s.clone();
scope.spawn(move |_| {
// let s_clone = s.clone();
for i in 0..4 {
// std::thread::sleep(std::time::Duration::from_millis(100));
match s.send(i) {
Ok(_) => {
// println!("sent i {:?}", i);
()
},
Err(_)=> {
// println!("error sending i {:?}", i);
()
}
};
}
// drop(s_clone);
})
})
.collect_vec();
// ** Drop the remaining sender.
drop(s);
}).expect("scope error.");
println!("{:?}", v);
}
注意添加 let s = s.clone();
并通过添加 move
对以下闭包进行更改,以便闭包获得克隆的所有权。然后我们将 drop(s)
移入范围。现在,一旦所有发送方线程都退出,通道的发送方将关闭,接收方的 for
循环将终止。
我有以下代码使用容量 (20) 小于我想通过交叉束通道发送的数据总量 (32) 的有界通道。我的目标是使用多个发送线程 (8) 和一定数量 (4) 的数字,每个线程通过交叉束通道发送给单个接收者,并使所有这些并行发生以优化效率。这只是我试图解决的一个更大问题的一个小原型。但是,我的代码导致程序无休止地卡住,永远不会 exit/timeout。我也知道它为什么会发生 - r.iter()
阻塞直到发件人被丢弃,这只发生在范围之外。我尝试了各种无效的方法:
用每个线程克隆发件人然后删除它们(如您在评论中所见)
接收器代码在范围之外,但这只是让最终向量包含 20 个长度而不是所需的 32 个长度。
fn main() {
use crossbeam_channel::{unbounded, bounded};
use crossbeam_utils::thread::scope;
use itertools::Itertools;
let (s, r) = bounded(20);
let mut v = vec![];
scope(|scope| {
scope.spawn(|_| {
for data in r.iter() {
v.push(data);
}
});
let _sender_threads = (0..8)
.into_iter()
.map(|_| {
scope.spawn(|_| {
// let s_clone = s.clone();
for i in 0..4 {
// std::thread::sleep(std::time::Duration::from_millis(100));
match s.send(i) {
Ok(_) => {
// println!("sent i {:?}", i);
()
},
Err(_)=> {
// println!("error sending i {:?}", i);
()
}
};
}
// drop(s_clone);
})
})
.collect_vec();
}).expect("scope error.");
drop(s);
println!("{:?}", v);
}
发生这种情况是因为 s
在范围结束之前不会被删除,但是在所有线程退出之前范围不会结束,并且调用 r.iter()
的线程直到s
被丢弃。这是典型的死锁场景。
您需要将 s
放入范围内,但您只能在发送方线程全部退出后才能这样做,因此您不能像目前这样 drop(s);
在范围内写了。
解决此问题的最简单方法是为每个发送方线程克隆 s
并将克隆移动到线程的闭包中,然后将 s
放入主范围。
fn main() {
use crossbeam_channel::{unbounded, bounded};
use crossbeam_utils::thread::scope;
use itertools::Itertools;
let (s, r) = bounded(20);
let mut v = vec![];
scope(|scope| {
scope.spawn(|_| {
for data in r.iter() {
v.push(data);
}
});
let _sender_threads = (0..8)
.into_iter()
.map(|_| {
// ** Clone the sender, move it into the thread:
let s = s.clone();
scope.spawn(move |_| {
// let s_clone = s.clone();
for i in 0..4 {
// std::thread::sleep(std::time::Duration::from_millis(100));
match s.send(i) {
Ok(_) => {
// println!("sent i {:?}", i);
()
},
Err(_)=> {
// println!("error sending i {:?}", i);
()
}
};
}
// drop(s_clone);
})
})
.collect_vec();
// ** Drop the remaining sender.
drop(s);
}).expect("scope error.");
println!("{:?}", v);
}
注意添加 let s = s.clone();
并通过添加 move
对以下闭包进行更改,以便闭包获得克隆的所有权。然后我们将 drop(s)
移入范围。现在,一旦所有发送方线程都退出,通道的发送方将关闭,接收方的 for
循环将终止。