SerializedSubject 是 RxJava 线程安全所必需的吗

Is SerializedSubject necessary for thread-safety in RxJava

我在 RxJava 中创建了一个 Subject 实例并从多个线程调用它的 onNext():

PublishSubject<String> subject = PublishSubject.create();
//...
subject.onNext("A");  //thread A
subject.onNext("B");  //thread B

RxJava documentation 表示:

take care not to call its onNext( ) method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.

Do I have to call toSerialized() on such Subject assuming I don't care if "A" goes before or after "B"?

是的,使用 toSerialized(),因为应用于该主题的所有运算符都假定正确的序列化发生在上游。如果不发生这种情况,流可能会失败或产生意外结果。

Is Subject thread-safe anyway or will I break RxJava without toSerialized()?

以上回答

What is the "Observable contract" that the documentation mentions?

Rx Design Guidelines.pdf 第 4 节定义了 Observable 契约:

4.2. Assume observer instances are called in a serialized fashion

As Rx uses a push model and .NET supports multithreading, it is possible for different messages to arrive different execution contexts at the same time. If consumers of observable sequences would have to deal with this in every place, their code would need to perform a lot of housekeeping to avoid common concurrency problems. Code written in this fashion would be harder to maintain and potentially suffer from performance issues.

我认为 RxJava 文档应该使它更容易被发现,所以我会提出一个问题。

根据 Dave 的回答,如果您事先知道您的主题将被不同的线程访问,您可以将它包装成一个 SerializedSubject http://reactivex.io/RxJava/javadoc/rx/subjects/SerializedSubject.html

Wraps a Subject so that it is safe to call its various on methods from different threads.

喜欢: private final Subject<Object, Object> bus = new SerializedSubject<Object, Object>(PublishSubject.create());

(取自 Ben Christensen 的 EventBus 示例:http://bl.ocks.org/benjchristensen/04eef9ca0851f3a5d7bf