如何为事件总线使用 Reactive Extensions

How to use Reactive Extensions for an event bus

我很难弄清楚如何使用反应式扩展在 C# 中创建事件总线,而没有主题 class,据我所知,这是不推荐的。

大多数 IEvents 都是我自己的,但一些像鼠标和键盘事件将由 WPF 提供。

我更喜欢将事件发布到事件总线的想法,而不是使用 Observable.FromEventPattern 在任何地方使用事件处理程序,因为其中一些事件有时只会由订阅者记录,而不会被执行。

这是显示我正在尝试做的事情的片段。

 public interface IEvent { } // marker interface

 public class BarcodeReaderEvent : EventArgs, IEvent
 { }

 public class MouseEvent : EventArgs, IEvent
 { }

 public class MyEventBus
 {
     private static IObservable<IEvent> eventBus = ??

     public void Post<IEvent>(IEvent theEvent)
     {
       // What goes here? 
     }

     public IDisposable Subscribe()
     {
         return ??
     }
 }

与使用事件总线一样不推荐使用 subject。如果你决定有一个单一的点,每个人都可以发布和收听,那么不妨全力以赴,也使用一个主题。

正如其他帖子中所建议的那样,标准建议是使用专门的 resources/services 来公开特定领域的事件。而不是拥有一个通用的事件 Bus/Event 聚合器(反模式 IMO),每个客户端都必须过滤掉他们希望发布的消息。他们应该只订阅专用端点。

如果您想使用您提出的结构,则需要使用主题。

您的代码基本上如下所示:

public interface IEvent { }

public class BarcodeReaderEvent : EventArgs, IEvent { }

public class MouseEvent : EventArgs, IEvent { }

public class MyEventBus
{
    private Subject<IEvent> _subject = new Subject<IEvent>();
    private IObservable<IEvent> _eventBus;

    public MyEventBus()
    {
        _eventBus = _subject.AsObservable();
    }

    public void Post(IEvent theEvent)
    {
        _subject.OnNext(theEvent);
    }

    public IDisposable Subscribe(IObserver<IEvent> observer)
    {
        return _eventBus.Subscribe(observer);
     }
}

但是,主题的问题在于,如果主题出现错误,那么每个对该主题的订阅都会收到 OnError 通知,然后所有订阅都会结束。

您的代码中确实也有一些“怪异之处”。您的 private static IObservable<IEvent> eventBusstatic,但 class 不是。我不知道那是不是故意的。此外 public void Post<IEvent>(IEvent theEvent) 似乎暗示您希望将 IEvent 作为通用类型传递(这意味着您正在使用通用类型隐藏实际接口类型)。可能你指的是 public void Post<T>(T theEvent) where T : IEvent,但我说不出来。

你可以试试这个,我觉得这个更接近你的实际需要,看看它是否更适合你:

public class MyEventBus
{
    private Subject<IEvent> _subject = new Subject<IEvent>();

    public void Post<T>(T message) where T : IEvent
    {
        _subject.OnNext(message);
    }

    public IObservable<T> AsObservable<T>() where T : IEvent
    {
        return _subject.OfType<T>();
    }
}

或者,您可能想看看这个使用 Rx 的更复杂的事件总线示例:https://github.com/reactiveui/ReactiveUI/blob/main/src/ReactiveUI/Routing/MessageBus.cs