如何防御性地创建 Rx Observables 并避免竞争条件?
How to defensively create Rx Observables and avoid race conditions?
我正在寻找一个奇怪的边缘情况,其中目录中的文件列表没有显示 0...2 个文件的结果,但对 3...n 个文件工作正常。
事实证明,原来的可观察序列工作得很好。但是我在一个订阅者中使用了 PublishSubject
来传递更改的效果。据报道,所有这一切都发生在主队列上,但似乎 PublishSubject
在它有订阅者之前就得到了馈送值。 (因为没有重播,订阅者不会知道。)
所以所有组件的设置(来源 -- 中继订阅者 -- 消费订阅者)似乎引入了 time 作为一个问题。
奇怪的观察结果:
- 如果我将原来的
Observable
改成 Driver
,一切正常。
- 如果我在原始链中的某处(在中继订阅者之前)插入
.observeOn(MainScheduler.instance)
,一切正常。即使我可以通过映射操作中断点的使用看到映射已经发生在 com.apple.main-queue (serial)
上。
- 如果我在来源处或订阅者代码中使用
.subscribeOn(MainScheduler.instance)
,我仍然会遇到问题。 (可能是因为在源头它只影响中继用户,后来为时已晚。)
现在我不知道如何防御性地处理这些问题。
看来 PublishSubject
可能不太适合这种情况。但是为什么在主队列上观察会改善这种情况?
什么时候应该(防御性地)在主队列creation/production上指定可观察序列到运行?(同样,这可能是一个务实的修复,但似乎只是偶然地解决了问题。)
或者,换句话说,你应该在什么时候假设消费者代码中的事情没有及时发生,即何时设置订阅?
我无法判断从输入序列到 PublishSubject
的中继事件造成了麻烦。这是不可察觉的。让我很困惑如何避免这样的错误。
你需要了解observeOn
和subscribeOn
的区别和Driver
序列的属性。
observeOn 影响观察块,即您处理发射值的位置,例如
someObservable
.subscribe(onNext: { /* this is an example of observation block */ })
subscribeOn 影响订阅块中的调度程序,即
产生值的地方,例如
Observable<Int>.create { /* this is an example of subscription block */ }
Driver
Driver是一种特殊类型的可观察序列,具有一些性质。
- Drivers 永远不会失败,即没有
onError
事件
- Drivers 添加
.observeOn(MainScheduler.instance)
到序列
- Drivers 使用
.share(replay: 1, scope: .whileConnected)
运算符 共享序列
考虑到最后两个,你可以从 observable 做你自己的 driver:
someObservable
.observeOn(MainScheduler.instance)
.share(replay: 1, scope: .whileConnected)
这就是你的问题可能想要做的。
我正在寻找一个奇怪的边缘情况,其中目录中的文件列表没有显示 0...2 个文件的结果,但对 3...n 个文件工作正常。
事实证明,原来的可观察序列工作得很好。但是我在一个订阅者中使用了 PublishSubject
来传递更改的效果。据报道,所有这一切都发生在主队列上,但似乎 PublishSubject
在它有订阅者之前就得到了馈送值。 (因为没有重播,订阅者不会知道。)
所以所有组件的设置(来源 -- 中继订阅者 -- 消费订阅者)似乎引入了 time 作为一个问题。
奇怪的观察结果:
- 如果我将原来的
Observable
改成Driver
,一切正常。 - 如果我在原始链中的某处(在中继订阅者之前)插入
.observeOn(MainScheduler.instance)
,一切正常。即使我可以通过映射操作中断点的使用看到映射已经发生在com.apple.main-queue (serial)
上。 - 如果我在来源处或订阅者代码中使用
.subscribeOn(MainScheduler.instance)
,我仍然会遇到问题。 (可能是因为在源头它只影响中继用户,后来为时已晚。)
现在我不知道如何防御性地处理这些问题。
看来 PublishSubject
可能不太适合这种情况。但是为什么在主队列上观察会改善这种情况?
什么时候应该(防御性地)在主队列creation/production上指定可观察序列到运行?(同样,这可能是一个务实的修复,但似乎只是偶然地解决了问题。)
或者,换句话说,你应该在什么时候假设消费者代码中的事情没有及时发生,即何时设置订阅?
我无法判断从输入序列到 PublishSubject
的中继事件造成了麻烦。这是不可察觉的。让我很困惑如何避免这样的错误。
你需要了解observeOn
和subscribeOn
的区别和Driver
序列的属性。
observeOn 影响观察块,即您处理发射值的位置,例如
someObservable
.subscribe(onNext: { /* this is an example of observation block */ })
subscribeOn 影响订阅块中的调度程序,即 产生值的地方,例如
Observable<Int>.create { /* this is an example of subscription block */ }
Driver
Driver是一种特殊类型的可观察序列,具有一些性质。
- Drivers 永远不会失败,即没有
onError
事件 - Drivers 添加
.observeOn(MainScheduler.instance)
到序列 - Drivers 使用
.share(replay: 1, scope: .whileConnected)
运算符 共享序列
考虑到最后两个,你可以从 observable 做你自己的 driver:
someObservable
.observeOn(MainScheduler.instance)
.share(replay: 1, scope: .whileConnected)
这就是你的问题可能想要做的。