使用 RxJava2 中的一系列运算符一次处理一个 PublishSubject 发射

Process one PublishSubject emission at a time with a chain of operators in RxJava2

我有一个PublishSubject<Event>

在每个新的 Event 上,我触发一个数据库查询(一些本地缓存的数据),然后获取结果并尝试 POST 它通过 HTTP 向服务器请求,当该服务器回复 200 我进行另一个数据库查询以删除刚刚发送的行。

这是通过链接完成的,大致如下:

subject
  .toSerialized()
  .flatMapMaybe { getCachedData() }
  .flatMap { uploadData() }
  .flatMapCompletable { cleanCache() }
  .subscribe()

subject 可能会在特定条件下发出两个快速 Events,假设间隔为 10 毫秒。

问题getCachedData() 因为第二次发射在 getCachedData()[=48 之后立即关闭=] 第一次发射完成,即在 cleanCache() 有机会在第二次发射之前清理数据库。

我想以某种方式将那些 flatMaps 合并到一个观察者中,以便 subject 在整个链完成之前不会产生新的发射, 最好没有任何种类的手工信号量。

我在单个线程池调度程序上执行 subscribeOn(),它只对每个 flatMap.[=12= 中的调用进行排序]

我看到一些建议将 toSerialized() 添加到主题,但现在我认为它与链的工作方式无关。

我看到还有 lift()compose() 运算符。我试图将所有 flatMaps 放在后者中,这并没有改变行为。前者我还在纳闷

将它们放在 concatMapX 子流中:

subject
.concatMapCompletable {
    getCachedData()
    .flatMap { uploadData() }
    .flatMapCompletable { cleanCache() }
}

I saw some advice to add toSerialized() to subjects, but now I think it has no relation to the way chains work

它对您的流程没有实际影响,除非您实际驱动它返回的 Subject 和多个线程。