链接具有不同发射类型的多个可观察量
Chaining multiple observables with different emission types
我正在尝试弄清楚如何将 observables 链接在一起。我有一个现有的方法:public static Observable<Data> getData()
。在我的另一个 class 中,我有这个现有代码:
doSomeBackgroundWork()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
我现在想将 getData()
调用链接到此调用。我该怎么做?我最初试过这个:
doSomeBackgroundWork()
.flatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
但这行不通,因为getData()代码实际上是在主线程上执行的。
即使这样也行不通:
doSomeBackgroundWork()
.concatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
此外,当我尝试这个时,问题是 zipWith
意味着两个观察值 运行 并行,我真的想 运行 一个接一个地 [=17] =]
doSomeBackgroundWork()
.zipWith(mApi.getData()),
new Func2<BgWork, DataResponse,DataResponse>() {
@Override
public DataResponse call(BgWork bgWork, DatResponse data) {
return data;
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
flatMap
运算符是这里的方法,您只需要处理并发性。如果你想在 io
调度器上 运行 整个 getData()
方法,那么你可以在 flatMap
之前应用 observeOn
运算符,然后像这样在它之后再次应用:
doSomeBackgroundWork()
.observeOn(Schedulers.io())
.flatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
你看,subscribeOn
运算符强制生产者 "compute" 并在提供的调度程序上发出数据,所以你在流组合中的什么地方使用它并不重要,它也没有多次使用时的效果。但 observeOn
运算符并非如此。它而是告诉下一个流在另一个调度程序上执行工作。这意味着,当您稍后再次使用它时,您可以再次将计算转移到另一个调度程序。
但是,如果您只需要在另一个调度程序上执行从 getData()
方法返回的可观察对象产生的工作,您可以在这个可观察对象上而不是在主流上使用 subscribeOn
。
doSomeBackgroundWork()
.flatMap(s -> call() {
mApi.getData().subscribeOn(Schedulers.io())
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
我正在尝试弄清楚如何将 observables 链接在一起。我有一个现有的方法:public static Observable<Data> getData()
。在我的另一个 class 中,我有这个现有代码:
doSomeBackgroundWork()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
我现在想将 getData()
调用链接到此调用。我该怎么做?我最初试过这个:
doSomeBackgroundWork()
.flatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
但这行不通,因为getData()代码实际上是在主线程上执行的。
即使这样也行不通:
doSomeBackgroundWork()
.concatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
此外,当我尝试这个时,问题是 zipWith
意味着两个观察值 运行 并行,我真的想 运行 一个接一个地 [=17] =]
doSomeBackgroundWork()
.zipWith(mApi.getData()),
new Func2<BgWork, DataResponse,DataResponse>() {
@Override
public DataResponse call(BgWork bgWork, DatResponse data) {
return data;
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
flatMap
运算符是这里的方法,您只需要处理并发性。如果你想在 io
调度器上 运行 整个 getData()
方法,那么你可以在 flatMap
之前应用 observeOn
运算符,然后像这样在它之后再次应用:
doSomeBackgroundWork()
.observeOn(Schedulers.io())
.flatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
你看,subscribeOn
运算符强制生产者 "compute" 并在提供的调度程序上发出数据,所以你在流组合中的什么地方使用它并不重要,它也没有多次使用时的效果。但 observeOn
运算符并非如此。它而是告诉下一个流在另一个调度程序上执行工作。这意味着,当您稍后再次使用它时,您可以再次将计算转移到另一个调度程序。
但是,如果您只需要在另一个调度程序上执行从 getData()
方法返回的可观察对象产生的工作,您可以在这个可观察对象上而不是在主流上使用 subscribeOn
。
doSomeBackgroundWork()
.flatMap(s -> call() {
mApi.getData().subscribeOn(Schedulers.io())
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })