在 Rx 中,处理线程安全是消费者(IObserver)的责任吗?

In Rx, is it a responsibility of the consumer (IObserver) to deal with thread safety?

在 ReactiveX 范式中,处理线程安全是消费者(IObserver)的责任吗?

例如,如果在 OnNext 仍在另一个线程上执行时出现 OnCompleted 调用?

它看起来像来自 Rx .NET 来源,但 the docs 有点模糊。

自从我最初在 tweet 中提出这个问题以来,我相信我现在已经找到了权威的答案。

看来我错误地假设 thread-safe 序列化是消费者的责任 (IObserver)。

根据原始 Rx Design Guidelines 文档(看起来是 best-kept 秘密 :)

4.2. Assume observer instances are called in a serialized fashion

As Rx uses a push model and .NET supports multithreading, it is possible for different messages to arrive different execution contexts at the same time. If consumers of observable sequences would have to deal with this in every place, their code would need to perform a lot of housekeeping to avoid common concurrency problems. Code written in this fashion would be harder to maintain and potentially suffer from performance issues.

进一步:

6.7. Serialize calls to IObserver methods within observable sequence implementations Rx is a composable API, many operators can play together. If all operators had to deal with concurrency the individual operators would become very complex. Next to this, concurrency is best controlled at the place it first occurs. Finally, Consuming the Rx API would become harder if each usage of Rx would have to deal with concurrency.

最后:

6.8. Avoid serializing operators As all Rx operators are bound to guideline 6.7, operators can safely assume that their inputs are serialized. Adding too much synchronization would clutter the code and can lead to performance degradation. If an observable sequence is not following the Rx contract (see chapter 0), it is up to the developer writing the end-user application to fix the observable sequence by calling the Synchronize operator at the first place the developer gets a hold of the observable sequence. This way the scope of additional synchronization is limited to where it is needed.

我个人的看法:如果生成 IObservable 的原始序列在调用 OnNextOnErrorOnComplete(或当 Dispose 在它的订阅上被调用),它应该注意正确地序列化这些调用。