在 Rust 中实现线程安全函数对象
implement thread safe function object in rust
我正在尝试在 rust lang 中实现一些稳定性模式(在 cloud-native-go 中引入,例如 https://github.com/cloud-native-go/examples/blob/main/ch04/circuitbreaker.go)。
基本思想是将一个函数包装在一个闭包中,它保留了一些关于函数调用的状态。我也希望我的实现是线程安全的。
我终于想到了这样的东西:
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
#[derive(PartialEq, Debug, Clone)]
pub(crate) enum Error {
UnReachable,
CircuitError,
}
pub(crate) type Circuit = Arc<Mutex<dyn Fn() -> Result<String, Error>>>;
pub(crate) fn fail_after(threshold: usize) -> Circuit {
let cnt = Arc::new(Mutex::new(0));
let f = move || {
let mut c = cnt.lock().unwrap();
*c += 1;
if *c > threshold {
return Err(Error::CircuitError);
} else {
return Ok("ok".to_owned());
}
};
return Arc::new(Mutex::new(f));
}
#[test]
fn test_debounce_first() {
let c = fail_after(1);
assert!(c.lock().unwrap()().is_ok());
assert!(c.lock().unwrap()().is_err());
}
#[test]
fn test_debounce_first_data_race() {
let c = fail_after(1);
let mut results = vec![];
for _ in 0..10 {
let d1 = Arc::clone(&c);
let h = thread::spawn(move || {
let r: Result<String, Error> = d1.lock().unwrap()();
r
});
results.push(h)
}
for result in results {
let r = result.join().unwrap();
assert!(r.is_ok())
}
}
多线程测试编译报错:
error[E0277]: `(dyn Fn() -> Result<String, Error> + 'static)` cannot be sent between threads safely
--> src/lib.rs:41:17
|
41 | let h = thread::spawn(move || {
| ^^^^^^^^^^^^^ `(dyn Fn() -> Result<String, Error> + 'static)` cannot be sent between threads safely
|
= help: the trait `Send` is not implemented for `(dyn Fn() -> Result<String, Error> + 'static)`
= note: required because of the requirements on the impl of `Sync` for `Mutex<(dyn Fn() -> Result<String, Error> + 'static)>`
= note: required because of the requirements on the impl of `Send` for `Arc<Mutex<(dyn Fn() -> Result<String, Error> + 'static)>>`
= note: required because it appears within the type `[closure@src/lib.rs:41:31: 44:10]`
note: required by a bound in `spawn`
我想知道如何解决这个问题?我已经用 Arc
和 Mutex
包装了函数对象,为什么编译器仍然报错?
简答:将 dyn Fn() -> Result<String, Error>
更改为 dyn Fn() -> Result<String, Error> + Send + 'static
。该类型的原始定义不保证 dyn Fn
存在多长时间或它是否可以在线程之间安全地发送。
另一个注意事项,您可能需要进一步将其修改为 Box<dyn Fn() -> Result<String, Error> + Send + 'static>
,因为我看到您正在使用 new
构建 Mutex
要求它包含的类型为 Sized
,但是 dyn Trait
未调整大小。我从未尝试过使用未确定大小的类型构造互斥体,因此您可能需要进一步研究。
编辑
您不需要装箱 dyn Fn() -> _
,因为 Arc<Mutex<_>>
最初是用具体的闭包构建的,但随后由于第三个一揽子实施而强制转换为未调整大小的 Mutex<dyn Fn() -> _>
提及 here and Arc
's implementation,共 CoerceUnsized
。
我正在尝试在 rust lang 中实现一些稳定性模式(在 cloud-native-go 中引入,例如 https://github.com/cloud-native-go/examples/blob/main/ch04/circuitbreaker.go)。 基本思想是将一个函数包装在一个闭包中,它保留了一些关于函数调用的状态。我也希望我的实现是线程安全的。
我终于想到了这样的东西:
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
#[derive(PartialEq, Debug, Clone)]
pub(crate) enum Error {
UnReachable,
CircuitError,
}
pub(crate) type Circuit = Arc<Mutex<dyn Fn() -> Result<String, Error>>>;
pub(crate) fn fail_after(threshold: usize) -> Circuit {
let cnt = Arc::new(Mutex::new(0));
let f = move || {
let mut c = cnt.lock().unwrap();
*c += 1;
if *c > threshold {
return Err(Error::CircuitError);
} else {
return Ok("ok".to_owned());
}
};
return Arc::new(Mutex::new(f));
}
#[test]
fn test_debounce_first() {
let c = fail_after(1);
assert!(c.lock().unwrap()().is_ok());
assert!(c.lock().unwrap()().is_err());
}
#[test]
fn test_debounce_first_data_race() {
let c = fail_after(1);
let mut results = vec![];
for _ in 0..10 {
let d1 = Arc::clone(&c);
let h = thread::spawn(move || {
let r: Result<String, Error> = d1.lock().unwrap()();
r
});
results.push(h)
}
for result in results {
let r = result.join().unwrap();
assert!(r.is_ok())
}
}
多线程测试编译报错:
error[E0277]: `(dyn Fn() -> Result<String, Error> + 'static)` cannot be sent between threads safely
--> src/lib.rs:41:17
|
41 | let h = thread::spawn(move || {
| ^^^^^^^^^^^^^ `(dyn Fn() -> Result<String, Error> + 'static)` cannot be sent between threads safely
|
= help: the trait `Send` is not implemented for `(dyn Fn() -> Result<String, Error> + 'static)`
= note: required because of the requirements on the impl of `Sync` for `Mutex<(dyn Fn() -> Result<String, Error> + 'static)>`
= note: required because of the requirements on the impl of `Send` for `Arc<Mutex<(dyn Fn() -> Result<String, Error> + 'static)>>`
= note: required because it appears within the type `[closure@src/lib.rs:41:31: 44:10]`
note: required by a bound in `spawn`
我想知道如何解决这个问题?我已经用 Arc
和 Mutex
包装了函数对象,为什么编译器仍然报错?
简答:将 dyn Fn() -> Result<String, Error>
更改为 dyn Fn() -> Result<String, Error> + Send + 'static
。该类型的原始定义不保证 dyn Fn
存在多长时间或它是否可以在线程之间安全地发送。
另一个注意事项,您可能需要进一步将其修改为 Box<dyn Fn() -> Result<String, Error> + Send + 'static>
,因为我看到您正在使用 new
构建 Mutex
要求它包含的类型为 Sized
,但是 dyn Trait
未调整大小。我从未尝试过使用未确定大小的类型构造互斥体,因此您可能需要进一步研究。
编辑
您不需要装箱 dyn Fn() -> _
,因为 Arc<Mutex<_>>
最初是用具体的闭包构建的,但随后由于第三个一揽子实施而强制转换为未调整大小的 Mutex<dyn Fn() -> _>
提及 here and Arc
's implementation,共 CoerceUnsized
。