在后台作业(不同线程)中使用特征对象

Using a trait object in a background job (different thread)

我想要一个后台工作者,它使用一个特征实现/对象一段时间。只要被使用,后台工作者就拥有这个对象。后台工作者“销毁”后,该对象应该可以再次使用。

我尝试用 async/await 制作所有东西,但它产生了更多问题。因此,我使用普通线程来创建一个最小的示例。首先,我还使用 Box<&dyn mut...> 将对象传递给后台工作人员,但我认为甚至不需要。

我的最小示例包含一个 MyWriter-trait,它可以将字符串写入某处。存在一种将字符串写入标准输出的实现。后台工作人员将此作者用于后台工作。工作人员有一个开始工作的开始方法和一个加入工作的停止方法(在我的真实代码中,我会使用一个通道向工作人员发送一个停止信息然后加入)。

我会 post 代码,然后描述我的问题:

https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=a01745c15ba1088acd2e3d287d60e270

use std::sync::Arc;
use std::sync::Mutex;
use std::thread::{spawn, JoinHandle};

/* Trait + an implementation */

trait MyWriter {
    fn write(&mut self, text: &str);
}

struct StdoutWriter {}

impl StdoutWriter {
    pub fn new() -> Self {
        Self {}
    }
}

impl MyWriter for StdoutWriter {
    fn write(&mut self, text: &str) {
        println!("{}", text);
    }
}

/* A background job which uses a "MyWriter" */

struct BackgroundJob<'a> {
    writer: Arc<Mutex<&'a dyn MyWriter>>,
    job: Option<JoinHandle<()>>,
}

impl<'a> BackgroundJob<'a> {
    pub fn new(writer: &'a mut dyn MyWriter) -> Self {
        Self {
            writer: Arc::new(Mutex::new(writer)),
            job: None,
        }
    }

    pub fn start(&mut self) {
        assert!(self.job.is_none());
        let writer = &self.writer;
        self.job = Some(std::thread::spawn(move || {
            // this background job uses "writer"
            let mut my_writer = writer.lock().unwrap();
            my_writer.write("x");
            // do something
            my_writer.write("y");
        }));
    }

    pub fn stop(&mut self) {
        if let Some(job) = self.job {
            job.join().unwrap();
            self.job = None;
        }
    }
}

/* Using BackgroundJob */

fn main() {
    let mut writer = StdoutWriter::new();
    writer.write("a");
    {
        let mut job = BackgroundJob::new(&mut writer);
        // inside this block, writer is owned by "job"
        job.start();
        job.stop();
    }
    // writer should be usable again
    writer.write("b");
}

stdout 上所需的输出是 a\nx\ny\nz\n,但程序甚至无法编译。我的主要问题是 (dyn MyWriter + 'a) cannot be shared between threads safely(编译器错误)。

如何为特征实现 Send / Sync?这似乎是不可能的。实际上,我认为如果对象(或引用)在 Arc<Mutex<...>> 内应该没问题,但这似乎还不够。为什么不呢?

也许有人知道如何解决这个问题,或者更重要的是根本问题到底是什么?

将引用放在 Arc 中是行不通的。由于 Arc 可以通过简单地克隆它而无限期地保持活动状态,因此引用很容易比借用它的任何东西都长寿,因此无法编译。您需要在 Arc 中放置一个拥有的对象,例如 Box<dyn MyWriter>。 (理想情况下,您只使用 Arc<dyn MyWriter>,但这会与 returning 来自 BackgroundJob 的作者发生冲突,如下所示。)

既然你不能从writer中借用main,你必须把它移到BackgroundJob中。但此时您已经放弃了对 writer 的所有权,并将值移至 BackgroundJob,因此您唯一的选择是 BackgroundJob return作家。然而,由于 BackgroundJob 将其编写者保留在特征对象后面,它只能返回它存储的 Box<dyn MyWriter>,而不是原始的 StdoutWriter.

这是这样工作的版本,保留类型擦除并返回类型擦除的写入器:

// Trait definition and StdoutWriter implementation unchanged

struct BackgroundJob {
    writer: Arc<Mutex<Box<dyn MyWriter + Send>>>,
    job: Option<JoinHandle<()>>,
}

impl BackgroundJob {
    pub fn new(writer: Box<dyn MyWriter + Send>) -> Self {
        Self {
            writer: Arc::new(Mutex::new(writer)),
            job: None,
        }
    }

    pub fn start(&mut self) {
        assert!(self.job.is_none());
        let writer = Arc::clone(&self.writer);
        self.job = Some(std::thread::spawn(move || {
            // this background job uses "writer"
            let mut my_writer = writer.lock().unwrap();
            my_writer.write("x");
            // do something
            my_writer.write("y");
        }));
    }

    pub fn stop(&mut self) {
        if let Some(job) = self.job.take() {
            job.join().unwrap();
        }
    }

    pub fn into_writer(self) -> Box<dyn MyWriter> {
        Arc::try_unwrap(self.writer)
            .unwrap_or_else(|_| panic!())
            .into_inner()
            .unwrap()
    }
}

fn main() {
    let mut writer = StdoutWriter::new();
    writer.write("a");
    let mut writer = {
        let mut job = BackgroundJob::new(Box::new(writer));
        job.start();
        job.stop();
        job.into_writer()
    };
    writer.write("b");
}

Playground

返回相同类型的编写器的版本将不得不放弃类型擦除并在编写器类型上通用。虽然有点复杂,但它的所有权语义与您最初设想的非常接近(至少在概念上):

struct BackgroundJob<W> {
    writer: Arc<Mutex<W>>,
    job: Option<JoinHandle<()>>,
}

impl<W: MyWriter + Send + 'static> BackgroundJob<W> {
    pub fn new(writer: W) -> Self {
        Self {
            writer: Arc::new(Mutex::new(writer)),
            job: None,
        }
    }

    // start() and stop() are unchanged

    pub fn into_writer(self) -> W {
        Arc::try_unwrap(self.writer)
            .unwrap_or_else(|_| panic!())
            .into_inner()
            .unwrap()
    }
}

fn main() {
    let mut writer = StdoutWriter::new();
    writer.write("a");
    {
        // inside this block, writer is moved into "job"
        let mut job = BackgroundJob::new(writer);
        job.start();
        job.stop();
        // reclaim the writer
        writer = job.into_writer();
    }
    writer.write("b");
}

Playground

主要问题是您想传递对线程的引用。该方法的问题在于线程可能比引用的对象存在得更久。显然这不会发生在你的情况下,但 rust 编译器无法推理。

该问题的解决方案是使用 Arc<Mutex<dyn MyType>> 而不是 Arc<Mutex<&dyn MyType>> - 没有生命周期 - 没问题。

下一个问题是 Mutex<T> - 只有 T 可以跨线程发送它。所以你必须T,在你的情况下dyn MyType,实施Send。这可以通过两种方式完成:

  1. 使 MyType 需要 Send - 在这种情况下,特征只能由 Send 对象实现:
trait MyWriter : Send{
    fn write(&mut self, text: &str);
}
  1. 或者使用额外的特征边界 - 在这种情况下你的特征限制较少,但是当你想跨线程发送它时你必须总是指定 MyTrait + Send:
Arc<Mutex<dyn MyWriter + Send>>

到目前为止一切顺利,但现在您的 new() 方法不起作用,因为 dyn MyWriter 不是 Sized。为了解决这个问题,您必须使您的方法通用:

    pub fn new<T: MyWriter + Send>(writer: T) -> Self {
        Self {
            writer: Arc::new(Mutex::new(writer)),
            job: None,
        }
    }

或者直接传一个Arc<Mutex<dyn MyWriter + Send>>:

    pub fn new(writer: Arc<Mutex<dyn MyWriter + Send>>) -> Self {
        Self { writer, job: None }
    }

完整的工作代码

use std::sync::Arc;
use std::sync::Mutex;
use std::thread::JoinHandle;

trait MyWriter {
    fn write(&mut self, text: &str);
}

struct StdoutWriter {}

impl StdoutWriter {
    pub fn new() -> Self {
        Self {}
    }
}

impl MyWriter for StdoutWriter {
    fn write(&mut self, text: &str) {
        println!("{}", text);
    }
}

/* A background job which uses a "MyWriter" */
struct BackgroundJob {
    writer: Arc<Mutex<dyn MyWriter + Send>>,
    job: Option<JoinHandle<()>>,
}

impl BackgroundJob {
    pub fn new(writer: Arc<Mutex<dyn MyWriter + Send>>) -> Self {
        Self { writer, job: None }
    }

    pub fn start(&mut self) {
        assert!(self.job.is_none());
        let writer = self.writer.clone();
        self.job = Some(std::thread::spawn(move || {
            let mut my_writer = writer.lock().unwrap();
            my_writer.write("x");
            // do something
            my_writer.write("y");
        }));
    }

    pub fn stop(&mut self) {
        if let Some(job) = self.job.take() {
            job.join().unwrap();
        }
    }
}

fn main() {
    let mut writer = StdoutWriter::new();
    writer.write("a");

    let writer = Arc::new(Mutex::new(writer));

    {
        let mut job = BackgroundJob::new(writer.clone());
        // inside this block, writer is owned by "job"
        job.start();
        job.stop();
    }
    // you have to acquire the lock in order to use the writer
    writer.lock().unwrap_or_else(|e| e.into_inner()).write("b");
}