RxJava 平面图:当其中一个可观察结果完成时会发生什么?
RxJava flat map: what happens when one of the resulting observable complete?
我是 RxJava 的新手,我知道平面图用于将发出的项目映射到可观察对象。我还知道,基于 documentation 发出的可观察对象全部组合(展平)为单个可观察流。
我想知道如果这些内部可观察对象中的任何一个完成会发生什么?
例如:我有一个发出项目数据键的可观察对象。我必须进行另一个异步 http 调用以从服务器获取项目数据,因此我使用另一个可观察对象来调用它。我使用平面图连接这两个并创建一个主要的可观察对象。
什么时候调用跟随 "SomeMethodThatWantsItems" 的 运行() 方法?
public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
Consumer<Item> onNextConsumer =
Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
searchObservable
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Item>(){
@Override
public void accept(@NonNull Item item) throws Exception {
//Do stuff with the item
}
}
, new Consumer<Exception>() { //some implementation of onErrorConsumer
}
//OnComplete
, new Action(){
@Override
public void run() throws Exception {
//When does this get called??? after the search complete or when the first http call is successful?
}
});
}
private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {
//Assume that our search engine call onFind everytime it finds something
searchEngine.addSearchListener(new searchEngineResultListener(){
@Override
public void onFind(String foundItemKey){
emitter.onNext(foundItemKey);
}
@Override
public void onFinishedFindingResults(){
emitter.onComplete();
}
});
}
});
}
private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{
return Observable.create(new ObservableOnSubscribe<Item>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {
//Call the server to get the item
httpCaller.call(key, new onCompleteListener(){
@Override
public void onCompletedCall(Item result)
{
emitter.onNext(result);
//The result is complete! end the stream
emitter.onComplete();
}
});
}
});
}
public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
//Where everything comes together
Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
retuern searchResultObservable
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, Observable<Item>>(){
@Override
public Observable<Item> apply(String key){
return getItemByKey(httpCaller, key);
}
});
}
onComplete()
总是接到一次电话,然后流停止。 (这是 Observable Contract 的一部分)。
这意味着在您的情况下,您在 SomeMethodThatWantsItems
的 onComplete()
将在检索到 所有 项后被调用。
在 flatMap()
的情况下,每个内部 Observable
完成后,将简单地向源 Observable
发出信号,停止将项目从内部 Observable
扁平化到源 Observable
, flatMap()
只要此流发送项目,就会合并来自内部 Observable
的项目,因此它基本上将整个内部 Observable
流消耗到源流中,整个流直到终止 event3 就像 onComplete()
,因此如果内部 Observable
可以发射超过 1 个项目,这意味着它将在源流上发射超过 1 个。
我是 RxJava 的新手,我知道平面图用于将发出的项目映射到可观察对象。我还知道,基于 documentation 发出的可观察对象全部组合(展平)为单个可观察流。
我想知道如果这些内部可观察对象中的任何一个完成会发生什么?
例如:我有一个发出项目数据键的可观察对象。我必须进行另一个异步 http 调用以从服务器获取项目数据,因此我使用另一个可观察对象来调用它。我使用平面图连接这两个并创建一个主要的可观察对象。
什么时候调用跟随 "SomeMethodThatWantsItems" 的 运行() 方法?
public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
Consumer<Item> onNextConsumer =
Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
searchObservable
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Item>(){
@Override
public void accept(@NonNull Item item) throws Exception {
//Do stuff with the item
}
}
, new Consumer<Exception>() { //some implementation of onErrorConsumer
}
//OnComplete
, new Action(){
@Override
public void run() throws Exception {
//When does this get called??? after the search complete or when the first http call is successful?
}
});
}
private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {
//Assume that our search engine call onFind everytime it finds something
searchEngine.addSearchListener(new searchEngineResultListener(){
@Override
public void onFind(String foundItemKey){
emitter.onNext(foundItemKey);
}
@Override
public void onFinishedFindingResults(){
emitter.onComplete();
}
});
}
});
}
private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{
return Observable.create(new ObservableOnSubscribe<Item>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {
//Call the server to get the item
httpCaller.call(key, new onCompleteListener(){
@Override
public void onCompletedCall(Item result)
{
emitter.onNext(result);
//The result is complete! end the stream
emitter.onComplete();
}
});
}
});
}
public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
//Where everything comes together
Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
retuern searchResultObservable
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, Observable<Item>>(){
@Override
public Observable<Item> apply(String key){
return getItemByKey(httpCaller, key);
}
});
}
onComplete()
总是接到一次电话,然后流停止。 (这是 Observable Contract 的一部分)。
这意味着在您的情况下,您在 SomeMethodThatWantsItems
的 onComplete()
将在检索到 所有 项后被调用。
在 flatMap()
的情况下,每个内部 Observable
完成后,将简单地向源 Observable
发出信号,停止将项目从内部 Observable
扁平化到源 Observable
, flatMap()
只要此流发送项目,就会合并来自内部 Observable
的项目,因此它基本上将整个内部 Observable
流消耗到源流中,整个流直到终止 event3 就像 onComplete()
,因此如果内部 Observable
可以发射超过 1 个项目,这意味着它将在源流上发射超过 1 个。