如何正确退出 mpsc::Receiver 上的线程阻塞
How to correctly exit the thread blocking on mpsc::Receiver
impl A {
fn new() -> (A, std::sync::mpsc::Receiver<Data>) {
let (sender, receiver) = std::sync::mpsc::channel();
let objA = A { sender: sender, }; // A spawns threads, clones and uses sender etc
(objA, receiver)
}
}
impl B {
fn new() -> B {
let (objA, receiver) = A::new();
B {
a: objA,
join_handle: Some(std::thread::spwan(move || {
loop {
match receiver.recv() {
Ok(data) => /* Do Something, inform main thread etc */,
Err(_) => break,
}
}
})),
}
}
}
impl Drop for B {
fn drop(&mut self) {
// Want to do something like "sender.close()/receiver.close()" etc so that the following
// thread joins. But there is no such function. How do i break the following thread ?
self.join_handle().take().unwrap().join().unwrap();
}
}
有没有办法在这种情况下干净退出?问题是,当接收者或发送者中的任何一个被丢弃时,另一个嗅探这个并给出错误。在 receiver
的情况下,它将被唤醒并产生错误,在这种情况下,我正在打破上面的无限循环和阻塞循环。但是,我如何明确地使用这个非常 属性 的通道,而不求助于其他标志与 try_recv()
等,并确定地干净地退出我的线程?
您可以将 B
类型的 a
字段包装在 Option
中。这样,在 Drop::drop
方法中,您可以执行 drop(self.a.take())
,这将用 None
替换该字段并删除发件人。这将关闭频道,您的话题现在可以正确加入了。
您可以创建一个新频道并将您的实际发件人换成虚拟发件人。然后你可以删除你的发件人并加入线程:
impl Drop for B {
fn drop(&mut self) {
let (s, _) = channel();
drop(replace(&mut self.a.sender, s));
self.join_handle.take().unwrap().join().unwrap();
}
}
在婴儿围栏里试试看:http://is.gd/y7A9L0
我不知道创建和立即删除频道的开销是多少,但它不是免费的,也不太可能被优化掉(那里有一个 Arc
)。
旁注,可以使用 Receiver::iter
方法将 receiver.recv()
上匹配的无限循环替换为 for 循环:
for _ in receiver.iter() {
// do something with the value
}
为什么不发送特定消息来关闭此线程?我不知道你的数据是什么,但大多数时候它可能是一个枚举,并在你的接收中添加一个像 'MyData::Shutdown' 这样的枚举变体,你可以简单地跳出循环。
impl A {
fn new() -> (A, std::sync::mpsc::Receiver<Data>) {
let (sender, receiver) = std::sync::mpsc::channel();
let objA = A { sender: sender, }; // A spawns threads, clones and uses sender etc
(objA, receiver)
}
}
impl B {
fn new() -> B {
let (objA, receiver) = A::new();
B {
a: objA,
join_handle: Some(std::thread::spwan(move || {
loop {
match receiver.recv() {
Ok(data) => /* Do Something, inform main thread etc */,
Err(_) => break,
}
}
})),
}
}
}
impl Drop for B {
fn drop(&mut self) {
// Want to do something like "sender.close()/receiver.close()" etc so that the following
// thread joins. But there is no such function. How do i break the following thread ?
self.join_handle().take().unwrap().join().unwrap();
}
}
有没有办法在这种情况下干净退出?问题是,当接收者或发送者中的任何一个被丢弃时,另一个嗅探这个并给出错误。在 receiver
的情况下,它将被唤醒并产生错误,在这种情况下,我正在打破上面的无限循环和阻塞循环。但是,我如何明确地使用这个非常 属性 的通道,而不求助于其他标志与 try_recv()
等,并确定地干净地退出我的线程?
您可以将 B
类型的 a
字段包装在 Option
中。这样,在 Drop::drop
方法中,您可以执行 drop(self.a.take())
,这将用 None
替换该字段并删除发件人。这将关闭频道,您的话题现在可以正确加入了。
您可以创建一个新频道并将您的实际发件人换成虚拟发件人。然后你可以删除你的发件人并加入线程:
impl Drop for B {
fn drop(&mut self) {
let (s, _) = channel();
drop(replace(&mut self.a.sender, s));
self.join_handle.take().unwrap().join().unwrap();
}
}
在婴儿围栏里试试看:http://is.gd/y7A9L0
我不知道创建和立即删除频道的开销是多少,但它不是免费的,也不太可能被优化掉(那里有一个 Arc
)。
旁注,可以使用 Receiver::iter
方法将 receiver.recv()
上匹配的无限循环替换为 for 循环:
for _ in receiver.iter() {
// do something with the value
}
为什么不发送特定消息来关闭此线程?我不知道你的数据是什么,但大多数时候它可能是一个枚举,并在你的接收中添加一个像 'MyData::Shutdown' 这样的枚举变体,你可以简单地跳出循环。