有条件的可观察链
Conditional chain of observables
我想通过多个 REST API 异步检索数据。我在 Android 上使用 Retrofit 和 rxJava 扩展,即我通过订阅 Observable 来执行任何 GET 请求。
正如我所说,我有多个来源 APIs,所以当第一个来源没有产生所需的结果时,我想尝试下一个,如果也失败,再试下一个,依此类推第四,直到查询完所有来源或找到结果。
我正在努力将这种方法转化为对 Observable 的正确使用,因为我不知道哪些运算符可以实现这种行为,而且还有一些限制需要遵守:
- 当找到一个结果时,剩余的APIs,如果有的话,不应该被查询
- 其他组件依赖于查询的结果,我希望它们在开始请求时得到一个Observable,这样这个Observable可以通知它们请求完成
- 我需要保留对上述 Observable 的引用,因为在完成之前可能会多次发出相同的请求,在这种情况下,我只会在第一次需要它时启动它,而后续请求只会获得 Observable当请求完成时通知
我一开始只有一个 API 来查询并使用以下内容进行依赖组件的请求和后续通知:
private Observable<String> loadData(int jobId) {
final ConnectableObservable<String> result = Async
.fromCallable(() -> getResult(jobId))
.publish();
getRestRequest()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
dataHolder -> {
if (dataHolder.getData() != null && !dataHolder.getData().isEmpty()) {
saveData(dataHolder.getData());
} else {
markNotFound(dataHolder);
}
},
error -> currentJobs.remove(jobId),
() -> {
currentJobs.remove(jobId);
result.connect();
});
return result;
}
这段代码只在第一次请求时调用,返回的Observable结果会保存在currentJobs中,后续请求只会获取Observable,不会再次触发请求。
非常感谢任何帮助。
假设您有一组在您每次订阅时重新连接的可观察对象:
List<Observable<Result>> suppliers = ...
那么你只需要做合乎逻辑的事情:
Observable<Result> results = Observable
.from(suppliers)
.concatMap(supplier -> supplier)
.takeFirst(result -> isAcceptable(result))
.cache()
使用.onErrorResumeNext
,假设每个服务observable可能return0个或1个元素使用first
如果没有元素被发出则发出错误:
Observable<T> a, b, c;
...
a.first().onErrorResumeNext(t -> b.first())
.onErrorResumeNext(t -> c.first())
.onErrorResumeNext(t -> d.first())
...
我想通过多个 REST API 异步检索数据。我在 Android 上使用 Retrofit 和 rxJava 扩展,即我通过订阅 Observable 来执行任何 GET 请求。
正如我所说,我有多个来源 APIs,所以当第一个来源没有产生所需的结果时,我想尝试下一个,如果也失败,再试下一个,依此类推第四,直到查询完所有来源或找到结果。
我正在努力将这种方法转化为对 Observable 的正确使用,因为我不知道哪些运算符可以实现这种行为,而且还有一些限制需要遵守:
- 当找到一个结果时,剩余的APIs,如果有的话,不应该被查询
- 其他组件依赖于查询的结果,我希望它们在开始请求时得到一个Observable,这样这个Observable可以通知它们请求完成
- 我需要保留对上述 Observable 的引用,因为在完成之前可能会多次发出相同的请求,在这种情况下,我只会在第一次需要它时启动它,而后续请求只会获得 Observable当请求完成时通知
我一开始只有一个 API 来查询并使用以下内容进行依赖组件的请求和后续通知:
private Observable<String> loadData(int jobId) {
final ConnectableObservable<String> result = Async
.fromCallable(() -> getResult(jobId))
.publish();
getRestRequest()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
dataHolder -> {
if (dataHolder.getData() != null && !dataHolder.getData().isEmpty()) {
saveData(dataHolder.getData());
} else {
markNotFound(dataHolder);
}
},
error -> currentJobs.remove(jobId),
() -> {
currentJobs.remove(jobId);
result.connect();
});
return result;
}
这段代码只在第一次请求时调用,返回的Observable结果会保存在currentJobs中,后续请求只会获取Observable,不会再次触发请求。
非常感谢任何帮助。
假设您有一组在您每次订阅时重新连接的可观察对象:
List<Observable<Result>> suppliers = ...
那么你只需要做合乎逻辑的事情:
Observable<Result> results = Observable
.from(suppliers)
.concatMap(supplier -> supplier)
.takeFirst(result -> isAcceptable(result))
.cache()
使用.onErrorResumeNext
,假设每个服务observable可能return0个或1个元素使用first
如果没有元素被发出则发出错误:
Observable<T> a, b, c;
...
a.first().onErrorResumeNext(t -> b.first())
.onErrorResumeNext(t -> c.first())
.onErrorResumeNext(t -> d.first())
...