RxJava:在发出来自另一个可观察对象的数据之前,阻止一个可观察对象发出
RxJava: prevent an observable from emitting until data from another observable is emitted
下面的代码块设计为离线优先。如果数据由 memory observable 发出,则本地和远程 observable 将永远不会触发。如果数据未保存在内存中,则本地可观察对象将尝试从房间数据库中读取数据,如果所有其他方法均失败,则远程可观察对象会查询 API.
远程源使用改造来发送查询和 returns 一个 flowable,然后将其转换为一个 observable。然而,在远程可观察对象触发之前,我有另一个可观察对象,即查询所需的 returns 位置数据。换句话说,remote observable 依赖于 location observable。在位置数据可用之前,如何使用 RxJava 来防止在 Concat 运算符中调用远程可观察对象?
locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();
remote = source.getSuggestionsFromNetwork(parameters)
.skipUntil(locationObservable);
locationObservable.subscribe(
source -> parameters = ParamManager.queryParameters(
source.getLatitude() + "," + source.getLongitude()),
error -> Log.println(Log.ERROR, TAG, error.getMessage()
)
);
Observable.concat(memory,local, remote)
.firstElement()
.subscribeOn(Schedulers.io())
.toObservable()
.observeOn(AndroidSchedulers.mainThread());
远程可观察对象:
public Observable<List<Venue>> getSuggestionsFromNetwork(HashMap<String, String> parameters){
return remoteSource.getData(parameters).doOnNext(
data -> {
localSource.cacheDataToDisk(data);
memorySource.cacheDataInMemory(data);
});
}
远程来源:
Observable<List<Venue>> getData(HashMap<String, String> params){
return Flowable.zip(loadSearchVenues(params), loadTrendingVenues(params),
loadRecommendedVenues(params), (search, trending, recommended) -> {
generalVenues = search.getResponse().getSuggestions();
trendingVenues = trending.getResponse().getSuggestions();
recommendedVenues = recommended.getResponse().getSuggestions();
allVenues.addAll(generalVenues);
allVenues.addAll(trendingVenues);
allVenues.addAll(recommendedVenues);
return allVenues;
}).toObservable();
}
错误:
2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/MemorySource: getData() called
2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/LocalSource: getData() called
2019-11-15 09:18:08.767 29428-29428/com.example.suggest E/MainViewModel: Query map was null (parameter #3)
如果您可以等到下一个位置发射,您可以执行以下操作:
locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();
remote = locationObservable
.map(source -> ParamManager.queryParameters(source.getLatitude() + ","
+ source.getLongitude()))
.concatMap(params -> source.getSuggestionsFromNetwork(params));
Observable
.concat(memory,local, remote)
.firstElement()
...
但如果不能,则必须将最后一个位置存储在可以直接使用的变量中,例如:
remote = Optional.ofNullable(getLastLocation())
.map(Observable::just)
.orElse(locationObservable)
.map(source -> ParamManager.queryParameters(source.getLatitude() + ","
+ source.getLongitude()))
.concatMap(params -> source.getSuggestionsFromNetwork(params));
其他地方:
locationObservable.subscribe(location -> setLastLocation(location));
下面的代码块设计为离线优先。如果数据由 memory observable 发出,则本地和远程 observable 将永远不会触发。如果数据未保存在内存中,则本地可观察对象将尝试从房间数据库中读取数据,如果所有其他方法均失败,则远程可观察对象会查询 API.
远程源使用改造来发送查询和 returns 一个 flowable,然后将其转换为一个 observable。然而,在远程可观察对象触发之前,我有另一个可观察对象,即查询所需的 returns 位置数据。换句话说,remote observable 依赖于 location observable。在位置数据可用之前,如何使用 RxJava 来防止在 Concat 运算符中调用远程可观察对象?
locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();
remote = source.getSuggestionsFromNetwork(parameters)
.skipUntil(locationObservable);
locationObservable.subscribe(
source -> parameters = ParamManager.queryParameters(
source.getLatitude() + "," + source.getLongitude()),
error -> Log.println(Log.ERROR, TAG, error.getMessage()
)
);
Observable.concat(memory,local, remote)
.firstElement()
.subscribeOn(Schedulers.io())
.toObservable()
.observeOn(AndroidSchedulers.mainThread());
远程可观察对象:
public Observable<List<Venue>> getSuggestionsFromNetwork(HashMap<String, String> parameters){
return remoteSource.getData(parameters).doOnNext(
data -> {
localSource.cacheDataToDisk(data);
memorySource.cacheDataInMemory(data);
});
}
远程来源:
Observable<List<Venue>> getData(HashMap<String, String> params){
return Flowable.zip(loadSearchVenues(params), loadTrendingVenues(params),
loadRecommendedVenues(params), (search, trending, recommended) -> {
generalVenues = search.getResponse().getSuggestions();
trendingVenues = trending.getResponse().getSuggestions();
recommendedVenues = recommended.getResponse().getSuggestions();
allVenues.addAll(generalVenues);
allVenues.addAll(trendingVenues);
allVenues.addAll(recommendedVenues);
return allVenues;
}).toObservable();
}
错误:
2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/MemorySource: getData() called
2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/LocalSource: getData() called
2019-11-15 09:18:08.767 29428-29428/com.example.suggest E/MainViewModel: Query map was null (parameter #3)
如果您可以等到下一个位置发射,您可以执行以下操作:
locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();
remote = locationObservable
.map(source -> ParamManager.queryParameters(source.getLatitude() + ","
+ source.getLongitude()))
.concatMap(params -> source.getSuggestionsFromNetwork(params));
Observable
.concat(memory,local, remote)
.firstElement()
...
但如果不能,则必须将最后一个位置存储在可以直接使用的变量中,例如:
remote = Optional.ofNullable(getLastLocation())
.map(Observable::just)
.orElse(locationObservable)
.map(source -> ParamManager.queryParameters(source.getLatitude() + ","
+ source.getLongitude()))
.concatMap(params -> source.getSuggestionsFromNetwork(params));
其他地方:
locationObservable.subscribe(location -> setLastLocation(location));