如何防御性地创建 Rx Observables 并避免竞争条件?

How to defensively create Rx Observables and avoid race conditions?

我正在寻找一个奇怪的边缘情况,其中目录中的文件列表没有显示 0...2 个文件的结果,但对 3...n 个文件工作正常。

事实证明,原来的可观察序列工作得很好。但是我在一个订阅者中使用了 PublishSubject 来传递更改的效果。据报道,所有这一切都发生在主队列上,但似乎 PublishSubject 在它有订阅者之前就得到了馈送值。 (因为没有重播,订阅者不会知道。)

所以所有组件的设置(来源 -- 中继订阅者 -- 消费订阅者)似乎引入了 time 作为一个问题。

奇怪的观察结果:

现在我不知道如何防御性地处理这些问题。

看来 PublishSubject 可能不太适合这种情况。但是为什么在主队列上观察会改善这种情况?

什么时候应该(防御性地)在主队列creation/production上指定可观察序列到运行?(同样,这可能是一个务实的修复,但似乎只是偶然地解决了问题。)

或者,换句话说,你应该在什么时候假设消费者代码中的事情没有及时发生,即何时设置订阅?

我无法判断从输入序列到 PublishSubject 的中继事件造成了麻烦。这是不可察觉的。让我很困惑如何避免这样的错误。

你需要了解observeOnsubscribeOn的区别和Driver序列的属性。

observeOn 影响观察块,即您处理发射值的位置,例如

someObservable
.subscribe(onNext: { /* this is an example of observation block */ })

subscribeOn 影响订阅块中的调度程序,即 产生值的地方,例如

Observable<Int>.create { /* this is an example of subscription block */ }

Driver

Driver是一种特殊类型的可观察序列,具有一些性质。

  • Drivers 永远不会失败,即没有 onError 事件
  • Drivers 添加 .observeOn(MainScheduler.instance) 到序列
  • Drivers 使用 .share(replay: 1, scope: .whileConnected) 运算符
  • 共享序列

考虑到最后两个,你可以从 observable 做你自己的 driver:

someObservable
.observeOn(MainScheduler.instance)
.share(replay: 1, scope: .whileConnected)

这就是你的问题可能想要做的。