通常可以将 Reactive Streams Processor 用作事件总线吗?

Is it generally OK to use a Reactive Streams Processor as an event bus?

我开始学习反应流是因为我对使用 RxJava 替代更传统事件总线的新趋势感到好奇。 This blog post 是对如何做到这一点的典型描述。如果我理解正确的话,RxJava 1.x 严格来说并不是 Reactive Streams 的实现,但它非常相似。 2.0 版包括一些 类 是合规的,或者至少通过了 TCK,因此此代码的更新版本可能看起来有点不同。

public class UserLocationModel {

  private PublishSubject<LatLng> subject = PublishSubject.create();

  public void setLocation(LatLng latLng) {
    subject.onNext(latLng);
  }

  public Observable<LatLng> getUserLocation() {
    return subject;
  }
}

在 Reactive Streams 术语中,我认为 subjectProcessor,它既是 Publisher 又是 Subscriber

问题是在未订阅任何内容的 Subscriber 上调用 onNext 似乎违反了 Reactive Streams 规范,尤其是 rule 1.9.

这仅仅是一个实现细节吗?我是否正确地认为您通常不能依赖于兼容的 Reactive Streams 实现?

标准 RxJava 2 的

Subjects 和 Processors 是宽松的,因此您不必在调用其他方法之前对它们调用 onSubscribe 。这部分是由于传统性,因为 1.x 主题没有 onSubscribe,部分是由于 RxJava 2 处理器不协调 Subscriber 端和 Publisher 并排选择,因此对 Subscription.

没有用

如果您订阅 RxJava Processor 任何 RS 兼容 Publisher,它们似乎会请求 Long.MAX_VALUE 并尽可能多地中继信号。如果你订阅一个符合 RS 的 Subscriber 到 RxJava Processors,它们将接受那些 Subscribers 的背压并且永远不会溢出它们,但是,缺少请求可能会导致个别 MissingBackpressureException 被发射而 Subscriber "thrown" 离开。 extensions library 中有一个自定义 Publisher 可以协调请求。

Am I correct in thinking that you cannot generally rely on this working with a compliant Reactive Streams implementation.

规范中没有任何内容,因此未在 TCK 中测试未收到 onSubscribe 调用但需要它的 Processor 应该发生什么,因此,我认为这已成为实施细节。

这里有两个更大的问题:

  1. Subjects 的发明是为了将命令式世界与反应式世界联系起来,并在 GUI 案例和非背压案例中作为事件的多播者很好地工作。在reactive-reactive多播中,它们是更好更直接的替代方案,例如publish(Function).
  2. 在事件总线中思考是一种倒退,因为您通过在单个 "rail" 上铲入和排出事件来创建单个阻塞点。相比之下,响应式设计有利于单独且通常独立的流,其中每个流都可以根据需要在线程之间跳转,并且可能在最后一刻之前避开主线程。