使用 RxJava2 中的一系列运算符一次处理一个 PublishSubject 发射
Process one PublishSubject emission at a time with a chain of operators in RxJava2
我有一个PublishSubject<Event>
。
在每个新的 Event 上,我触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试 POST 它通过 HTTP 向服务器请求,当该服务器回复 200 我进行另一个数据库查询以删除刚刚发送的行。
这是通过链接完成的,大致如下:
subject
.toSerialized()
.flatMapMaybe { getCachedData() }
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
.subscribe()
subject 可能会在特定条件下发出两个快速 Events,假设间隔为 10 毫秒。
问题 是 getCachedData() 因为第二次发射在 getCachedData()[=48 之后立即关闭=] 第一次发射完成,即在 cleanCache() 有机会在第二次发射之前清理数据库。
我想以某种方式将那些 flatMaps 合并到一个观察者中,以便 subject 在整个链完成之前不会产生新的发射, 最好没有任何种类的手工信号量。
我在单个线程池调度程序上执行 subscribeOn(),它只对每个 flatMap.[=12= 中的调用进行排序]
我看到一些建议将 toSerialized() 添加到主题,但现在我认为它与链的工作方式无关。
我看到还有 lift() 和 compose() 运算符。我试图将所有 flatMaps 放在后者中,这并没有改变行为。前者我还在纳闷
将它们放在 concatMapX
子流中:
subject
.concatMapCompletable {
getCachedData()
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
}
I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work
它对您的流程没有实际影响,除非您实际驱动它返回的 Subject
和多个线程。
我有一个PublishSubject<Event>
。
在每个新的 Event 上,我触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试 POST 它通过 HTTP 向服务器请求,当该服务器回复 200 我进行另一个数据库查询以删除刚刚发送的行。
这是通过链接完成的,大致如下:
subject
.toSerialized()
.flatMapMaybe { getCachedData() }
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
.subscribe()
subject 可能会在特定条件下发出两个快速 Events,假设间隔为 10 毫秒。
问题 是 getCachedData() 因为第二次发射在 getCachedData()[=48 之后立即关闭=] 第一次发射完成,即在 cleanCache() 有机会在第二次发射之前清理数据库。
我想以某种方式将那些 flatMaps 合并到一个观察者中,以便 subject 在整个链完成之前不会产生新的发射, 最好没有任何种类的手工信号量。
我在单个线程池调度程序上执行 subscribeOn(),它只对每个 flatMap.[=12= 中的调用进行排序]
我看到一些建议将 toSerialized() 添加到主题,但现在我认为它与链的工作方式无关。
我看到还有 lift() 和 compose() 运算符。我试图将所有 flatMaps 放在后者中,这并没有改变行为。前者我还在纳闷
将它们放在 concatMapX
子流中:
subject
.concatMapCompletable {
getCachedData()
.flatMap { uploadData() }
.flatMapCompletable { cleanCache() }
}
I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work
它对您的流程没有实际影响,除非您实际驱动它返回的 Subject
和多个线程。