如何在 Rust 中实现观察者模式?

How can I implement the observer pattern in Rust?

我有一个可观察的集合和一个观察者。我希望观察者成为 trait Observer 的特征实现。当某些事件发生时,可观察对象应该能够通知每个观察者。这应该可以解释我的意图:

struct A {
    observables: Vec<Observable>,
}

impl A {
    fn new() -> A {
        A {
            observables: vec![],
        }
    }
}

trait Observer {
    fn event(&mut self, _: &String);
}

impl Observer for A {
    fn event(&mut self, ev: &String) {
        println!("Got event from observable: {}", ev);
    }
}

struct Observable {
    observers: Vec<dyn Observer>, // How to contain references to observers? (this line is invalid)
}

impl Observable {
    fn new() -> Observable {
        Observable {
            observers: Vec::new(),
        }
    }

    fn add_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.push(o);
    }

    fn remove_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.remove(o);
    }

    fn notify_observers(&self, ev: &String) {
        for o in &mut self.observers {
            o.event(ev);
        }
    }
}

(Playground)

我收到错误:

error[E0277]: the size for values of type `(dyn Observer + 'static)` cannot be known at compilation time
  --> src/lib.rs:24:5
   |
24 |     observers: Vec<dyn Observer>, // How to contain references to observers?
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `std::marker::Sized` is not implemented for `(dyn Observer + 'static)`
   = note: to learn more, visit <https://doc.rust-lang.org/book/second-edition/ch19-04-advanced-types.html#dynamically-sized-types-and-the-sized-trait>
   = note: required by `std::vec::Vec`

这只是我想做的模型。我在 Java、Python 和 C++ 中有这样的代码,但我不知道如何在 Rust 中实现观察者模式。我相信我的问题是在可观察对象中存储对观察者对象的引用。

根据实施选择,观察者模式可能会带来所有权挑战。

在垃圾收集语言中,通常有 Observable 引用 Observer(通知它)和 Observer 引用 Observable(通知它)注销自己)...这会在所有权方面造成一些挑战(谁比谁长寿?)并且整个 "notification on un-registering" 事情。

在 Rust(和 C++)中,我建议避免循环。


简单的解决方案

ObservableObserver 有不同的生命周期,none 拥有对方或预计比对方长寿。

use std::rc::Weak;

struct Event;

trait Observable {
    fn register(&mut self, observer: Weak<dyn Observer>);
}

trait Observer {
    fn notify(&self, event: &Event);
}

关键是将Observer分配到Rc and then hand over Weak(弱引用)到Observable

如果 Observer 需要在 Event 上修改,那么它要么需要内部可变性,要么需要包装到 RefCell 中(将 Weak<RefCell<dyn Observer>> 传递给Observable).

当通知时,Observable 会定期意识到有死的弱引用(Observer 已经消失),然后它可以懒惰地删除那些。


还有其他解决方案,例如使用 Broker(非常类似于事件循环),从推模式移动到拉模式(即(1)生成所有事件,(2)处理所有事件),但是这些与传统的观察者模式有点不同,并且有不同的 pluses/minuses 所以我不会在这里尝试对它们进行处理。

我使用了回调函数。它简单而强大,没有生命周期问题或类型擦除。

我试过了Weak<dyn Observer>,但是

  1. 它需要一个Rc
  2. 您必须重复代码才能创建不同的观察者结构。
  3. 需要类型擦除
pub struct Notifier<E> {
    subscribers: Vec<Box<dyn Fn(&E)>>,
}

impl<E> Notifier<E> {
    pub fn new() -> Notifier<E> {
        Notifier {
            subscribers: Vec::new(),
        }
    }

    pub fn register<F>(&mut self, callback: F)
    where
        F: 'static + Fn(&E),
    {
        self.subscribers.push(Box::new(callback));
    }

    pub fn notify(&self, event: E) {
        for callback in &self.subscribers {
            callback(&event);
        }
    }
}

这是我基于对这个问题的回答和许多痛苦的实现。我使用弱引用来存储观察者和 RefCell 以便能够调用可变 notify().

我正在使用 Arc because my listener could be called from any thread. If you were using a single thread, you could use Rc

每次调用dispatch(),都会检查是否有弱引用的监听器消失了。如果有,它将清理监听器列表。

pub enum Event {} // You make Event hold anything you want to fire 

pub trait Listener {
    fn notify(&mut self, event: &Event);
}

pub trait Dispatchable<T>
    where T: Listener
{
    fn register_listener(&mut self, listener: Arc<RefCell<T>>);
}

pub struct Dispatcher<T>
    where T: Listener
{
    /// A list of synchronous weak refs to listeners
    listeners: Vec<Weak<RefCell<T>>>,
}

impl<T> Dispatchable<T> for Dispatcher<T>
    where T: Listener
{
    /// Registers a new listener
    fn register_listener(&mut self, listener: Arc<RefCell<T>>) {
        self.listeners.push(Arc::downgrade(&listener));
    }
}

impl<T> Dispatcher<T>
    where T: Listener
{
    pub fn new() -> Dispatcher<T> {
        Dispatcher { listeners: Vec::new() }
    }

    pub fn num_listeners(&self) -> usize {
        self.listeners.len()
    }

    pub fn dispatch(&mut self, event: Event) {
        let mut cleanup = false;
        // Call the listeners
        for l in self.listeners.iter() {
            if let Some(mut listener_rc) = l.upgrade() {
                let mut listener = listener_rc.borrow_mut();
                listener.notify(&event);
            } else {
                println!("Cannot get listener, cleanup necessary");
                cleanup = true;
            }
        }
        // If there were invalid weak refs, clean up the list
        if cleanup {
            println!("Dispatcher is cleaning up weak refs");
            self.listeners.retain(|ref l| {
                // Only retain valid weak refs
                let got_ref = l.clone().upgrade();
                match got_ref {
                    None => false,
                    _ => true,
                }
            });
        }
    }
}

这是一个练习它的单元测试代码片段。

测试来自纸牌游戏库,其中我的 Event 枚举有 FlopDealtGameFinished 变体。该测试创建并注册了我的侦听器,并确保在调度 FlopDealt 时调用它。范围部分是这样我可以在侦听器超出范围后测试弱引用行为。我触发另一个事件并计算听众的数量以确保列表被清除。

use std::time::Instant;

#[derive(Debug)]
pub enum Event {
    FlopDealt,
    GameFinished { ended: Instant },
}

struct MyListener {
    pub flop_dealt: bool,
}

impl Listener for MyListener {
    fn notify(&mut self, event: &Event) {
        println!("Notify called with {:?}", event);
        if let Event::FlopDealt = event {
            println!("Flop dealt");
            self.flop_dealt = true;
        }
    }
}

#[test]
fn events_register() {
    let mut d: Dispatcher<MyListener> = Dispatcher::new();

    {
        let listener_rc = Arc::new(RefCell::new(MyListener { flop_dealt: false }));
        d.register_listener(listener_rc.clone());
        d.dispatch(Event::FlopDealt);

        let flop_dealt = listener_rc.borrow().flop_dealt;
        println!("Flop was {}dealt", if flop_dealt { "" } else { "not " });
        assert_eq!(flop_dealt, true);
        assert_eq!(d.num_listeners(), 1);
    }

    // Listener should disappear
    d.dispatch(Event::GameFinished {
        ended: Instant::now(),
    });
    assert_eq!(d.num_listeners(), 0);
}

我写了this回答如果你还感兴趣的话:

我试过 this 方法,对我来说效果很好,它很简单:

  • 定义你的对象struct
  • 定义你的Listeners,
  • 定义您的标准函数,我们称它们为 Extensions,
  • 通过执行Self::Fn<Listener>
  • 定义添加Emitter选项到Extensions

我在 playground is below, I just solved it in the rust forum:

中使用的相同代码
// 1. Define your object
//#[derive(Debug)]
pub struct Counter {
 count: i32,
}

// 2. (Optional), if do not want to use `#[derive(Debug)]` 
//    you can define your own debug/release format
impl std::fmt::Debug for Counter {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Counter `count` is: {}", self.count)
    }
}

// 3. Define your Listeners trait 
trait EventListener {
     fn on_squared() {
        println!("Counter squared")
     }
     fn on_increased(amount: i32) {
        println!("Counter increased by {}", amount)
     }
     fn on_decreased(self, amount: i32);
}

// 4. Implement your Listeners trait to your object
impl EventListener for Counter {
    fn on_decreased(self, amount: i32) {
        println!("Counter reduced from {} to {}", &self.count, &self.count - amount)
    }
}

// 5. (Recommended), Define your standard functions/Extensions/Emitters
//    trait signatures
trait EventEmitter {
    fn square(&mut self);
    fn increase(&mut self, amount: i32);
    fn decrease(&mut self, amount: i32);
    fn change_by(&mut self, amount: i32);
}

// 6. Implement your standard functions/Extensions/Emitters trait to your object
impl EventEmitter for Counter {
    fn square(&mut self) { 
        self.count = self.count.pow(2);
        Self::on_squared();      // This is Event Emitter, calling the Listner
    }
    fn increase(&mut self, amount: i32) { 
        self.count = self.count + amount; 
        Self::on_increased(amount);   // This is Event Emitter, calling the Listner
    }
    fn decrease(&mut self, amount: i32) {
        let initial_value = self.count;
        self.count = self.count - amount;
        Self::on_decreased(Self {count: initial_value}, amount);  // This is Event Emitter, calling the Listner
    }
    fn change_by(&mut self, amount: i32) {
        let initial_value = self.count;
        self.count = self.count + amount;
        match amount {
            x if x > 0 => Self::on_increased(amount),   // This is Event Emitter, calling the Listner
            x if x < 0 => Self::on_decreased(Self {count: initial_value},  // This is Event Emitter, calling the Listneramount.abs()),
            _   => println!("No changes")
        }
    }
}

// 7. Build your main function
fn main() {
    let mut x = Counter { count: 5 };
    println!("Counter started at: {:#?}", x.count);
    x.square();   // Call the extension, which will automatically trigger the listner
    println!("{:?}", x);
    x.increase(3);
    println!("{:?}", x);
    x.decrease(2);
    println!("{:?}", x);
    x.change_by(-1);
    println!("{:?}", x);
}

并得到以下输出:

Counter started at: 5
Counter squared
Counter `count` is: 25
Counter increased by 3
Counter `count` is: 28
Counter reduced from 28 to 26
Counter `count` is: 26
Counter reduced from 26 to 25
Counter `count` is: 25

rust 设计模式 https://github.com/lpxxn/rust-design-pattern

trait IObserver {
    fn update(&self);
}

trait ISubject<'a, T: IObserver> {
    fn attach(&mut self, observer: &'a T);
    fn detach(&mut self, observer: &'a T);
    fn notify_observers(&self);
}

struct Subject<'a, T: IObserver> {
    observers: Vec<&'a T>,
}
impl<'a, T: IObserver + PartialEq> Subject<'a, T> {
    fn new() -> Subject<'a, T> {
        Subject {
            observers: Vec::new(),
        }
    }
}

impl<'a, T: IObserver + PartialEq> ISubject<'a, T> for Subject<'a, T> {
    fn attach(&mut self, observer: &'a T) {
        self.observers.push(observer);
    }
    fn detach(&mut self, observer: &'a T) {
        if let Some(idx) = self.observers.iter().position(|x| *x == observer) {
            self.observers.remove(idx);
        }
    }
    fn notify_observers(&self) {
        for item in self.observers.iter() {
            item.update();
        }
    }
}

#[derive(PartialEq)]
struct ConcreteObserver {
    id: i32,
}
impl IObserver for ConcreteObserver {
    fn update(&self) {
        println!("Observer id:{} received event!", self.id);
    }
}

fn main() {
    let mut subject = Subject::new();
    let observer_a = ConcreteObserver { id: 1 };
    let observer_b = ConcreteObserver { id: 2 };

    subject.attach(&observer_a);
    subject.attach(&observer_b);
    subject.notify_observers();

    subject.detach(&observer_b);
    subject.notify_observers();
}

输出

Observer id:1 received event!
Observer id:2 received event!
Observer id:1 received event!