Rust:安全地终止线程池中的 运行 个线程?
Rust: Safely terminate running threads inside a ThreadPool?
我按照官方 Rust 文档向您展示了如何编写多线程 HTTP 服务器。他们将引导您构建一个由各个工作人员支持的线程池,其中每个工作人员都有自己的线程。线程池将执行 Job
个实例并通过 MPSC 实现将它们传递给工作人员。
根据文档,这就是代码的样子,完整的指南是 here。
struct Worker {
id: usize,
thread: thread::JoinHandle<()>
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Receiver {} received a job.", id);
job();
});
Worker { id, thread }
}
fn get_thread(&self) -> &thread::JoinHandle<()> {
return &self.thread;
}
}
我想添加一个附加功能:shutdown()
到我的 Worker 实现中,即
fn shutdown(&mut self) {
&self.thread.join();
}
我会从主线程调用它,从而在程序结束执行之前等待所有线程完成。但是,上面的关闭功能给我这个错误:
error: src/main.rs:31: cannot move out of `self.thread` which is behind a mutable reference
error: src/main.rs:31: move occurs because `self.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
我宣布自己为&mut self
,因为我将成为突变self.thread。我忽略了什么?
我认为它告诉我通过调用&self.thread.join()
,我将销毁属于这个工作实例的thread::JoinHandle<()>
,我不允许这样做?那我怎么杀这个帖子呢?
其次,我认识到我的线程有一个无限循环,所以从技术上讲,在此线程上调用 join 只会导致主线程挂起。我应该如何断线?将共享引用传递给我的 Worker 并让 Worker 检查它的值,从循环中跳出?
所以首先,我猜你有一个 ThreadPool
结构。您需要为该结构实现 Drop
特征,以便在整个 ThreadPool
被删除时完成所有线程的连接,您不能真正在每个工作人员的基础上这样做。
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
take()
调用在这里很重要,因为我们要取出选项的 Some
变体。如果线程已经是 None
,我们就不需要调用 join.
以任何语言关闭线程池都需要所有线程确认一些中央信号,而不仅仅是一个线程。如果这是 C++,您可能会检查线程工作函数顶部的原子布尔值,如果为真,则 return。虽然你可以在 Rust 中做到这一点,但我假设你想遵循一种更接近本书中介绍的项目的方法,即消息。
您想在频道中使用消息类型而不是工作,这样您就有两个变体,工作消息和终止消息。
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
而你的 ThreadPool
将是
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
那么,你的 worker 函数看起来更像这样,类似于 this section of the book
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
现在,您还提到您的程序挂起,无法从外部将其关闭。使用信号处理箱可能比一开始尝试自己实现更容易。一种选择是 singal_hook
箱子:
use signal_hook::{iterator::Signals, SIGINT};
use std::{error::Error, thread, time::Duration};
fn main() -> Result<(), Box<dyn Error>> {
let signals = Signals::new(&[SIGINT])?;
thread::spawn(move || {
for sig in signals.forever() {
drop(pool);
}
});
Ok(())
}
这将在您在键盘上按 ctrl+c 时终止程序
我按照官方 Rust 文档向您展示了如何编写多线程 HTTP 服务器。他们将引导您构建一个由各个工作人员支持的线程池,其中每个工作人员都有自己的线程。线程池将执行 Job
个实例并通过 MPSC 实现将它们传递给工作人员。
根据文档,这就是代码的样子,完整的指南是 here。
struct Worker {
id: usize,
thread: thread::JoinHandle<()>
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Receiver {} received a job.", id);
job();
});
Worker { id, thread }
}
fn get_thread(&self) -> &thread::JoinHandle<()> {
return &self.thread;
}
}
我想添加一个附加功能:shutdown()
到我的 Worker 实现中,即
fn shutdown(&mut self) {
&self.thread.join();
}
我会从主线程调用它,从而在程序结束执行之前等待所有线程完成。但是,上面的关闭功能给我这个错误:
error: src/main.rs:31: cannot move out of `self.thread` which is behind a mutable reference
error: src/main.rs:31: move occurs because `self.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
我宣布自己为&mut self
,因为我将成为突变self.thread。我忽略了什么?
我认为它告诉我通过调用&self.thread.join()
,我将销毁属于这个工作实例的thread::JoinHandle<()>
,我不允许这样做?那我怎么杀这个帖子呢?
其次,我认识到我的线程有一个无限循环,所以从技术上讲,在此线程上调用 join 只会导致主线程挂起。我应该如何断线?将共享引用传递给我的 Worker 并让 Worker 检查它的值,从循环中跳出?
所以首先,我猜你有一个 ThreadPool
结构。您需要为该结构实现 Drop
特征,以便在整个 ThreadPool
被删除时完成所有线程的连接,您不能真正在每个工作人员的基础上这样做。
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
take()
调用在这里很重要,因为我们要取出选项的 Some
变体。如果线程已经是 None
,我们就不需要调用 join.
以任何语言关闭线程池都需要所有线程确认一些中央信号,而不仅仅是一个线程。如果这是 C++,您可能会检查线程工作函数顶部的原子布尔值,如果为真,则 return。虽然你可以在 Rust 中做到这一点,但我假设你想遵循一种更接近本书中介绍的项目的方法,即消息。
您想在频道中使用消息类型而不是工作,这样您就有两个变体,工作消息和终止消息。
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
而你的 ThreadPool
将是
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
那么,你的 worker 函数看起来更像这样,类似于 this section of the book
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
现在,您还提到您的程序挂起,无法从外部将其关闭。使用信号处理箱可能比一开始尝试自己实现更容易。一种选择是 singal_hook
箱子:
use signal_hook::{iterator::Signals, SIGINT};
use std::{error::Error, thread, time::Duration};
fn main() -> Result<(), Box<dyn Error>> {
let signals = Signals::new(&[SIGINT])?;
thread::spawn(move || {
for sig in signals.forever() {
drop(pool);
}
});
Ok(())
}
这将在您在键盘上按 ctrl+c 时终止程序