这是在 Rust 线​​程之间共享闭包回调的惯用方式吗?

Is this the idiomatic way to share a closure callback among threads in rust?

我想将回调函数 do_something 传递给函数 new,该函数又会创建多个需要回调的线程(此处通过调用 LibThreaded 中的函数) .例如,如果我有一个 lib 在线程中接收套接字消息,然后调用回调对它们执行某些操作,就会发生这种情况。回调本身可能会从另一个库 OtherLib 调用代码,为此必须实现 Clone 特性。

我想出了一个似乎可以工作但看起来过于复杂的版本。这真的是 correct/best 共享回调的方式吗?是否可以通过其他方式解除 do_something 函数的 Clone 特征要求?

#![feature(async_await)]
#![warn(rust_2018_idioms)]
use std::sync::{Arc, Mutex};
use std::error::Error;
use tokio::{runtime::Runtime};

#[derive(Clone)]
struct OtherLib { }

impl OtherLib {
    pub fn do_something(&self, text1: String, text2: String) {
        println!("doing something in other lib: {} + {}", text1, text2);
    }
}

type Callback = Arc<Mutex<Box<dyn 'static + FnMut(String, String) + Send + Sync>>>;

struct LibThreaded {
    something_threaded: String,
    callback: Callback,
}

impl LibThreaded {
    pub fn new(callback: Option<impl 'static + FnMut(String, String) + Send + Sync + Clone>) -> LibThreaded {
        if callback.is_some() {
            LibThreaded { something_threaded: "I am in a thread: ".to_string(), callback: Arc::new(Mutex::new(Box::new(callback.unwrap()))) }
        } else {
            LibThreaded { something_threaded: "I am in a thread: ".to_string(), callback: Arc::new(Mutex::new(Box::new(|_,_| {}))) }
        }

    }

    async fn receiving(&mut self) {
        println!("in receiving loop");
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something_threaded.clone(), "hello world".to_string());
    }
}

struct Lib {
    something: String,
    callback: Callback,
}

impl Lib {
    pub fn new() -> Lib {
        Lib { something: "I am lib: ".to_string(), callback: Arc::new(Mutex::new(Box::new(|_, _| {}))) }
    }

    pub fn set_callback(&mut self, callback: Option<impl 'static + FnMut(String, String) + Send + Sync + Clone>) {
        println!("in lib2");
        if callback.is_some() {
            self.callback = Arc::new(Mutex::new(Box::new(callback.clone().unwrap())));
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something.clone(), "hello world".to_string());
        }
        let mut t = LibThreaded::new(callback);

        tokio::spawn(async move {
            t.receiving().await;
        });
    }
}

fn main() -> Result<(), Box<dyn Error>> {
    let ol = OtherLib {};

    let callback = move |text1: String, text2: String| {
            ol.do_something(text1, text2);
    };

    let rt = Runtime::new()?;
    rt.block_on(async {
        let mut lib = Lib::new();
        lib.set_callback(Some(callback));
    });
    rt.shutdown_on_idle();
    Ok(())
}

使用上面的程序,我得到了正确的输出:

in lib2
doing something in other lib: I am lib:  + hello world
in receiving loop
doing something in other lib: I am in a thread:  + hello world

我想知道是否有没有 Arc<Mutex<Box... 且不对 fn do_something 施加额外要求的更简单的解决方案。感谢您的帮助!

编辑版本: 感谢下面 comments/answers 的帮助,我有以下工作代码(请参阅 rodrigo 对第 1 行和第 2 行的评论):

#![feature(async_await)]
#![warn(rust_2018_idioms)]
use std::sync::{Arc, Mutex};
use std::error::Error;
use tokio::{runtime::Runtime};

#[derive(Clone)]
struct OtherLib { }

impl OtherLib {
    pub fn do_something(&self, text1: String, text2: String) {
        println!("doing something in other lib: {} + {}", text1, text2);
    }
}

type Callback = Arc<Mutex<dyn 'static + FnMut(String, String) + Send + Sync>>;

struct LibThreaded {
    something_threaded: String,
    callback: Callback,
}

impl LibThreaded {
    pub fn new(callback: Option<Callback>) -> LibThreaded {
        LibThreaded {
            something_threaded: "I am in a thread: ".to_string(),
            callback: callback.unwrap_or_else(|| Arc::new(Mutex::new(|_,_| {})))
        }
    }

    async fn receiving(&mut self) {
        println!("in receiving loop");
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something_threaded.clone(), "hello world".to_string());
    }
}

struct Lib {
    something: String,
    callback: Callback,
}

impl Lib {
    pub fn new() -> Lib {
        Lib { something: "I am lib: ".to_string(), callback: Arc::new(Mutex::new(|_, _| {})) }
    }

    pub async fn set_callback(&mut self, callback: Option<impl 'static + FnMut(String, String) + Send + Sync>) {
        println!("in lib2");
        let callback = callback.map(|cb| Arc::new(Mutex::new(cb)) as Callback); //line 1
        if let Some(cb) = &callback {  //line 2
            self.callback = cb.clone();
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something.clone(), "hello world".to_string());
        }
        let mut t = LibThreaded::new(callback);

        tokio::spawn(async move {
            t.receiving().await;
        });
    }
}

fn main() -> Result<(), Box<dyn Error>> {
    let ol = OtherLib {};

    let callback = move |text1: String, text2: String| {
            ol.do_something(text1, text2);
    };

    let rt = Runtime::new()?;
    rt.block_on(async {
        let mut lib = Lib::new();
        lib.set_callback(Some(callback)).await;
    });
    rt.shutdown_on_idle();
    Ok(())
}

让我重写有趣的代码片段,因为我理解惯用的 Rust:

首先,LibThreaded::new 可以通过调用 unwrap_or_else 轻松重写:

pub fn new(callback: Option<Callback>) -> LibThreaded {
    LibThreaded {
        something_threaded: "I am in a thread: ".to_string(),
        callback: callback.unwrap_or_else(|| Arc::new(Mutex::new(|_,_| {})))
    }
}

您也可以使用 Option::unwrap_or,但这种方式更好,因为您可以延迟分配 Mutex,也就是说,如果 OptionSome,则不会花费您任何费用.

然后 Lib::set_callback 可以通过一些更改来改进:首先删除 Clone 要求;然后使用 if let Some(...) 而不是 is_some();最后尽快将回调转换为 Callback,以便可以克隆它:

pub async fn set_callback(&mut self, callback: Option<impl FnMut(String, String) + Send + Sync + 'static>) {
    let callback = callback.map(|cb| Arc::new(Mutex::new(cb)) as Callback); //line 1
    if let Some(cb) = &callback {  //line 2
        self.callback = cb.clone();
        let c = &mut *self.callback.lock().unwrap();
        (c)(self.something.clone(), "hello world".to_string());
    }
    let mut t = LibThreaded::new(callback);

    //...
}

有几行值得补充评论:

第 1 行:Option 中的值被替换为 Option::map。如果我们天真地这样做,callback.map(|cb| Arc::new(Mutex::new(cb))); 我们会得到一个 Option<impl FnMut...> 而不是 Option<dyn FnMut>。幸运的是,我们可以将 Arc<impl T> 强制转换为 Arc<dyn T>,因此我们可以在方便的类型别名的帮助下做到这一点。

第 2 行:您可以通过多种方式执行此操作。您也可以写 if let Some(cb) = callback.clone()Option<T:Clone> 也是 Clone)或 if let Some(ref cb) = callback。我个人更喜欢我写的方式。思路是不要在这个block中消耗callback,这样以后可以重用