在 Rust 中实现线程安全函数对象

implement thread safe function object in rust

我正在尝试在 rust lang 中实现一些稳定性模式(在 cloud-native-go 中引入,例如 https://github.com/cloud-native-go/examples/blob/main/ch04/circuitbreaker.go)。 基本思想是将一个函数包装在一个闭包中,它保留了一些关于函数调用的状态。我也希望我的实现是线程安全的。

我终于想到了这样的东西:

use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

#[derive(PartialEq, Debug, Clone)]
pub(crate) enum Error {
    UnReachable,
    CircuitError,
}

pub(crate) type Circuit = Arc<Mutex<dyn Fn() -> Result<String, Error>>>;

pub(crate) fn fail_after(threshold: usize) -> Circuit {
    let cnt = Arc::new(Mutex::new(0));
    let f = move || {
        let mut c = cnt.lock().unwrap();
        *c += 1;
        if *c > threshold {
            return Err(Error::CircuitError);
        } else {
            return Ok("ok".to_owned());
        }
    };
    return Arc::new(Mutex::new(f));
}

#[test]
fn test_debounce_first() {
    let c = fail_after(1);
    assert!(c.lock().unwrap()().is_ok());
    assert!(c.lock().unwrap()().is_err());
}


#[test]
fn test_debounce_first_data_race() {
    let c = fail_after(1);
    let mut results = vec![];
    for _ in 0..10 {
        let d1 = Arc::clone(&c);
        let h = thread::spawn(move || {
            let r: Result<String, Error> = d1.lock().unwrap()();
            r
        });
        results.push(h)
    }

    for result in results {
        let r = result.join().unwrap();
        assert!(r.is_ok())
    }
}

(rust playground)

多线程测试编译报错:

error[E0277]: `(dyn Fn() -> Result<String, Error> + 'static)` cannot be sent between threads safely
   --> src/lib.rs:41:17
    |
41  |         let h = thread::spawn(move || {
    |                 ^^^^^^^^^^^^^ `(dyn Fn() -> Result<String, Error> + 'static)` cannot be sent between threads safely
    |
    = help: the trait `Send` is not implemented for `(dyn Fn() -> Result<String, Error> + 'static)`
    = note: required because of the requirements on the impl of `Sync` for `Mutex<(dyn Fn() -> Result<String, Error> + 'static)>`
    = note: required because of the requirements on the impl of `Send` for `Arc<Mutex<(dyn Fn() -> Result<String, Error> + 'static)>>`
    = note: required because it appears within the type `[closure@src/lib.rs:41:31: 44:10]`
note: required by a bound in `spawn`

我想知道如何解决这个问题?我已经用 ArcMutex 包装了函数对象,为什么编译器仍然报错?

简答:将 dyn Fn() -> Result<String, Error> 更改为 dyn Fn() -> Result<String, Error> + Send + 'static。该类型的原始定义不保证 dyn Fn 存在多长时间或它是否可以在线程之间安全地发送。

另一个注意事项,您可能需要进一步将其修改为 Box<dyn Fn() -> Result<String, Error> + Send + 'static>,因为我看到您正在使用 new 构建 Mutex要求它包含的类型为 Sized,但是 dyn Trait 未调整大小。我从未尝试过使用未确定大小的类型构造互斥体,因此您可能需要进一步研究。

编辑

您不需要装箱 dyn Fn() -> _,因为 Arc<Mutex<_>> 最初是用具体的闭包构建的,但随后由于第三个一揽子实施而强制转换为未调整大小的 Mutex<dyn Fn() -> _>提及 here and Arc's implementation,共 CoerceUnsized