通常可以将 Reactive Streams Processor 用作事件总线吗?
Is it generally OK to use a Reactive Streams Processor as an event bus?
我开始学习反应流是因为我对使用 RxJava 替代更传统事件总线的新趋势感到好奇。 This blog post 是对如何做到这一点的典型描述。如果我理解正确的话,RxJava 1.x 严格来说并不是 Reactive Streams 的实现,但它非常相似。 2.0 版包括一些 类 是合规的,或者至少通过了 TCK,因此此代码的更新版本可能看起来有点不同。
public class UserLocationModel {
private PublishSubject<LatLng> subject = PublishSubject.create();
public void setLocation(LatLng latLng) {
subject.onNext(latLng);
}
public Observable<LatLng> getUserLocation() {
return subject;
}
}
在 Reactive Streams 术语中,我认为 subject
是 Processor
,它既是 Publisher
又是 Subscriber
。
问题是在未订阅任何内容的 Subscriber
上调用 onNext
似乎违反了 Reactive Streams 规范,尤其是 rule 1.9.
这仅仅是一个实现细节吗?我是否正确地认为您通常不能依赖于兼容的 Reactive Streams 实现?
标准 RxJava 2 的 Subject
s 和 Processor
s 是宽松的,因此您不必在调用其他方法之前对它们调用 onSubscribe
。这部分是由于传统性,因为 1.x 主题没有 onSubscribe
,部分是由于 RxJava 2 处理器不协调 Subscriber
端和 Publisher
并排选择,因此对 Subscription
.
没有用
如果您订阅 RxJava Processor
任何 RS 兼容 Publisher
,它们似乎会请求 Long.MAX_VALUE
并尽可能多地中继信号。如果你订阅一个符合 RS 的 Subscriber
到 RxJava Processor
s,它们将接受那些 Subscriber
s 的背压并且永远不会溢出它们,但是,缺少请求可能会导致个别 MissingBackpressureException
被发射而 Subscriber
"thrown" 离开。 extensions library 中有一个自定义 Publisher
可以协调请求。
Am I correct in thinking that you cannot generally rely on this working with a compliant Reactive Streams implementation.
规范中没有任何内容,因此未在 TCK 中测试未收到 onSubscribe
调用但需要它的 Processor
应该发生什么,因此,我认为这已成为实施细节。
这里有两个更大的问题:
- Subjects 的发明是为了将命令式世界与反应式世界联系起来,并在 GUI 案例和非背压案例中作为事件的多播者很好地工作。在reactive-reactive多播中,它们是更好更直接的替代方案,例如
publish(Function)
.
- 在事件总线中思考是一种倒退,因为您通过在单个 "rail" 上铲入和排出事件来创建单个阻塞点。相比之下,响应式设计有利于单独且通常独立的流,其中每个流都可以根据需要在线程之间跳转,并且可能在最后一刻之前避开主线程。
我开始学习反应流是因为我对使用 RxJava 替代更传统事件总线的新趋势感到好奇。 This blog post 是对如何做到这一点的典型描述。如果我理解正确的话,RxJava 1.x 严格来说并不是 Reactive Streams 的实现,但它非常相似。 2.0 版包括一些 类 是合规的,或者至少通过了 TCK,因此此代码的更新版本可能看起来有点不同。
public class UserLocationModel {
private PublishSubject<LatLng> subject = PublishSubject.create();
public void setLocation(LatLng latLng) {
subject.onNext(latLng);
}
public Observable<LatLng> getUserLocation() {
return subject;
}
}
在 Reactive Streams 术语中,我认为 subject
是 Processor
,它既是 Publisher
又是 Subscriber
。
问题是在未订阅任何内容的 Subscriber
上调用 onNext
似乎违反了 Reactive Streams 规范,尤其是 rule 1.9.
这仅仅是一个实现细节吗?我是否正确地认为您通常不能依赖于兼容的 Reactive Streams 实现?
Subject
s 和 Processor
s 是宽松的,因此您不必在调用其他方法之前对它们调用 onSubscribe
。这部分是由于传统性,因为 1.x 主题没有 onSubscribe
,部分是由于 RxJava 2 处理器不协调 Subscriber
端和 Publisher
并排选择,因此对 Subscription
.
如果您订阅 RxJava Processor
任何 RS 兼容 Publisher
,它们似乎会请求 Long.MAX_VALUE
并尽可能多地中继信号。如果你订阅一个符合 RS 的 Subscriber
到 RxJava Processor
s,它们将接受那些 Subscriber
s 的背压并且永远不会溢出它们,但是,缺少请求可能会导致个别 MissingBackpressureException
被发射而 Subscriber
"thrown" 离开。 extensions library 中有一个自定义 Publisher
可以协调请求。
Am I correct in thinking that you cannot generally rely on this working with a compliant Reactive Streams implementation.
规范中没有任何内容,因此未在 TCK 中测试未收到 onSubscribe
调用但需要它的 Processor
应该发生什么,因此,我认为这已成为实施细节。
这里有两个更大的问题:
- Subjects 的发明是为了将命令式世界与反应式世界联系起来,并在 GUI 案例和非背压案例中作为事件的多播者很好地工作。在reactive-reactive多播中,它们是更好更直接的替代方案,例如
publish(Function)
. - 在事件总线中思考是一种倒退,因为您通过在单个 "rail" 上铲入和排出事件来创建单个阻塞点。相比之下,响应式设计有利于单独且通常独立的流,其中每个流都可以根据需要在线程之间跳转,并且可能在最后一刻之前避开主线程。