Rust:安全地终止线程池中的 运行 个线程?

Rust: Safely terminate running threads inside a ThreadPool?

我按照官方 Rust 文档向您展示了如何编写多线程 HTTP 服务器。他们将引导您构建一个由各个工作人员支持的线程池,其中每个工作人员都有自己的线程。线程池将执行 Job 个实例并通过 MPSC 实现将它们传递给工作人员。

根据文档,这就是代码的样子,完整的指南是 here

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            println!("Receiver {} received a job.", id);
            job();
        });
        Worker { id, thread }
    }

    fn get_thread(&self) -> &thread::JoinHandle<()> {
        return &self.thread;
    }
}

我想添加一个附加功能:shutdown() 到我的 Worker 实现中,即

    fn shutdown(&mut self) {
        &self.thread.join();
    }

我会从主线程调用它,从而在程序结束执行之前等待所有线程完成。但是,上面的关闭功能给我这个错误:

error: src/main.rs:31: cannot move out of `self.thread` which is behind a mutable reference
error: src/main.rs:31: move occurs because `self.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait

我宣布自己为&mut self,因为我将成为突变self.thread。我忽略了什么?

认为它告诉我通过调用&self.thread.join(),我将销毁属于这个工作实例的thread::JoinHandle<()>,我不允许这样做?那我怎么杀这个帖子呢?

其次,我认识到我的线程有一个无限循环,所以从技术上讲,在此线程上调用 join 只会导致主线程挂起。我应该如何断线?将共享引用传递给我的 Worker 并让 Worker 检查它的值,从循环中跳出?

所以首先,我猜你有一个 ThreadPool 结构。您需要为该结构实现 Drop 特征,以便在整个 ThreadPool 被删除时完成所有线程的连接,您不能真正在每个工作人员的基础上这样做。

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

take() 调用在这里很重要,因为我们要取出选项的 Some 变体。如果线程已经是 None,我们就不需要调用 join.

以任何语言关闭线程池都需要所有线程确认一些中央信号,而不仅仅是一个线程。如果这是 C++,您可能会检查线程工作函数顶部的原子布尔值,如果为真,则 return。虽然你可以在 Rust 中做到这一点,但我假设你想遵循一种更接近本书中介绍的项目的方法,即消息。

您想在频道中使用消息类型而不是工作,这样您就有两个变体,工作消息和终止消息。

type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
    NewJob(Job),
    Terminate,
}

而你的 ThreadPool 将是

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

那么,你的 worker 函数看起来更像这样,类似于 this section of the book

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

现在,您还提到您的程序挂起,无法从外部将其关闭。使用信号处理箱可能比一开始尝试自己实现更容易。一种选择是 singal_hook 箱子:

use signal_hook::{iterator::Signals, SIGINT};
use std::{error::Error, thread, time::Duration};

fn main() -> Result<(), Box<dyn Error>> {
    let signals = Signals::new(&[SIGINT])?;

    thread::spawn(move || {
        for sig in signals.forever() {
            drop(pool);
        }
    });

    Ok(())
}

这将在您在键盘上按 ctrl+c 时终止程序