RxJS Observables 只能由一个订阅者处理吗?

Can RxJS Observables be only handled by one subscriber?

我正在尝试了解 Observables 和 RxJS 是如何工作的,所以这可能根本不是如何使用它们的关键。

我有一个 Angular2 应用程序,并且还在使用 RxJS Observables 来发送事件。现在对于一种特殊类型的错误事件,我想知道该事件是否已被另一个订阅者处理。 Observable 上可能存在多个订阅者,一些订阅者可能对事件承担全部责任,这样其他订阅者就不会再收到它了。

这个想法来自路由事件在 WPF 中的工作方式。在事件处理程序中,您将获得 RoutedEventArgs parameter, which has a Property Handled:

If setting, set to true if the event is to be marked handled; otherwise false. If reading this value, true indicates that either a class handler, or some instance handler along the route, has already marked this event handled. false.indicates that no such handler has marked the event handled.

另一个实现示例是中间件在 ASP.NET 核心管道中的工作方式 - https://docs.microsoft.com/en-us/aspnet/core/fundamentals/middleware - 您可以调用下一个中间件或仅 return 结果。

我正在考虑将 Handled 属性 添加到我将放入可观察管道的事件中,但我不确定这是否是在 RxJS 中执行此操作的惯用方式。

通常,您对可观察对象执行此操作的方式是不要将可观察对象交给每个人,并且每个人都订阅它。相反,您给每个感兴趣的人一个 "add to the pipeline" 的机会,然后最后订阅一次。有很多方法可以做到这一点。最简单的是实际上不给任何人可观察的。但是让他们为您提供回调:

class Foo {

   observable = ...;
   callbacks = [];

   addCallback(callback) { this.callbacks.push(callback); }

   constructor() {

       // subscribe to the observable and call any registered callbacks
       this.observable.subscribe(e => {
           for (const cb of this.callbacks) {
               // callback returns true if it has "handled" the event
               if (cb(e)) {
                   return; // do not call any other callbacks
               }
           }
       });
   }
}

const foo = new Foo();

// subscriber1 handles "type1" events
foo.addCallback(ev => ev.type === "type1");

// subscriber2
foo.addCallback(ev => ev.type === "type2");

这是最简单的方法。还有其他方法 Foo 为每个客户端公开可观察对象并监控其结果以构建管道。