链接具有不同发射类型的多个可观察量

Chaining multiple observables with different emission types

我正在尝试弄清楚如何将 observables 链接在一起。我有一个现有的方法:public static Observable<Data> getData()。在我的另一个 class 中,我有这个现有代码:

doSomeBackgroundWork()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

我现在想将 getData() 调用链接到此调用。我该怎么做?我最初试过这个:

doSomeBackgroundWork()
.flatMap(s -> call() {
   mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

但这行不通,因为getData()代码实际上是在主线程上执行的。

即使这样也行不通:

doSomeBackgroundWork()
.concatMap(s -> call() {
   mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

此外,当我尝试这个时,问题是 zipWith 意味着两个观察值 运行 并行,我真的想 运行 一个接一个地 [=17] =]

doSomeBackgroundWork()
.zipWith(mApi.getData()),
    new Func2<BgWork, DataResponse,DataResponse>() {
    @Override
    public DataResponse call(BgWork bgWork, DatResponse data) {
       return data;
    }})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

flatMap 运算符是这里的方法,您只需要处理并发性。如果你想在 io 调度器上 运行 整个 getData() 方法,那么你可以在 flatMap 之前应用 observeOn 运算符,然后像这样在它之后再次应用:

doSomeBackgroundWork()
  .observeOn(Schedulers.io())
  .flatMap(s -> call() {
       mApi.getData()
  }
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<..>() { ... })

你看,subscribeOn 运算符强制生产者 "compute" 并在提供的调度程序上发出数据,所以你在流组合中的什么地方使用它并不重要,它也没有多次使用时的效果。但 observeOn 运算符并非如此。它而是告诉下一个流在另一个调度程序上执行工作。这意味着,当您稍后再次使用它时,您可以再次将计算转移到另一个调度程序。

但是,如果您只需要在另一个调度程序上执行从 getData() 方法返回的可观察对象产生的工作,您可以在这个可观察对象上而不是在主流上使用 subscribeOn

doSomeBackgroundWork()
  .flatMap(s -> call() {
       mApi.getData().subscribeOn(Schedulers.io())
  }
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<..>() { ... })