根据结果链接可观察对象
Chaining observables based on result
我是 rx-java 和 rx-android 的完全初学者。我听说一开始学习曲线很陡峭。
我正在尝试使用 rx-android.
将所有基于 Eventbus 的代码替换为类型更安全的替代方案
我已设置此代码段以根据编辑文本文本更改事件创建可观察对象:
MainActivity
RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).subscribe(new Action1<EditText>() {
@Override
public void call(EditText editText) {
searchStopResultFragment.query(editText.getText().toString());
}
});
RxUtils:
public static Observable<EditText> createEditTextChangeObservable(final EditText editText){
return Observable.create(new Observable.OnSubscribe<EditText>() {
@Override
public void call(final Subscriber<? super EditText> subscriber) {
editText.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
if (subscriber.isUnsubscribed()) return;
subscriber.onNext(editText);
}
});
}
});
}
SearchStopResultFragment:
public void query(String query){
lastQuery = query;
resultObservable = StopProvider.getStopResultObservable(getActivity().getContentResolver(),query);
subscription = resultObservable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Stop>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Stop> stops) {
if(!lastQuery.equals("")) {
if(stops.size()>0) {
ArrayList<AdapterItem> items = adapter.getItems();
items.clear();
for (Stop stop : stops) {
SearchResultStopItem item = new SearchResultStopItem(stop, SearchResultStopItem.STOP);
items.add(item);
}
adapter.setItems(items);
adapter.notifyDataSetChanged();
}else{
//DO A NOTHER ASYNC QUERY TO FETCH RESULTS
}
}else{
showStartItems();
}
}
});
}
感觉我做错了。我在每个文本更改事件中从片段中的查询方法创建新的可观察对象。我还想根据 StopProvider.getStopResultObservable
中的结果创建一个新的异步查找操作(参见评论)
有什么想法吗?
使用 concatmap 解决它并结合最新的:
RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() {
@Override
public Observable<Pair<String, List<Stop>>> call(EditText editText) {
String query = editText.getText().toString();
//searchStopResultFragment.setLastQuery(query);
if(query.isEmpty()){
return Observable.just(null);
}
return Observable.combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() {
@Override
public Pair<String, List<Stop>> call(List<Stop> stops, String s) {
return new Pair(s,stops);
}
});
}
}).concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() {
@Override
public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) {
if(queryAndStops!=null) {
if (queryAndStops.second.size() == 0) {
return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() {
@Override
public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) {
return Observable.just(locationNameResponse.getAddresses());
}
});
} else {
return Observable.just(queryAndStops.second);
}
}
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle()).subscribe(new Action1<List<Stop>>() {
@Override
public void call(List<Stop> stops) {
if (stops != null) {
searchStopResultFragment.showStops(stops);
}else{
searchStopResultFragment.showStartItems();
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
showError(throwable);
}
});
然而,有没有更好的方法可以在不发送 Observable.just(null) 的情况下跳出链并在下一次调用时检查空值?
您可以将第二个 concatMap 移动到您唯一需要它的地方 - 在 combineLatest 之后
RxUtils.createEditTextChangeObservable(txtInput)
.throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.concatMap(new Func1<EditText, Observable<Pair<String, List<Stop>>>>() {
@Override
public Observable<Pair<String, List<Stop>>> call(EditText editText) {
String query = editText.getText().toString();
//searchStopResultFragment.setLastQuery(query);
if (query.isEmpty()) {
return Observable.just(null);
}
return Observable
.combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() {
@Override
public Pair<String, List<Stop>> call(List<Stop> stops, String s) {
return new Pair(s, stops);
}
})
.concatMap(new Func1<R, Observable<? extends Pair<String, List<Stop>>>>() {
@Override
public Observable<? extends Pair<String, List<Stop>>> call(R r) {
if (queryAndStops.second.size() == 0) {
return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() {
@Override
public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) {
return Observable.just(locationNameResponse.getAddresses());
}
});
} else {
return Observable.just(queryAndStops.second);
}
}
});
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle())
.subscribe(new Action1<List<Stop>>() {
@Override
public void call(List<Stop> stops) {
if (stops != null) {
searchStopResultFragment.showStops(stops);
} else {
searchStopResultFragment.showStartItems();
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
showError(throwable);
}
});
这是我想出的:
RxUtils.createEditTextChangeObservable(txtInput)
.throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.map(EXTRACT_STRING)
.filter(STRING_IS_NOT_EMPTY)
.concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() {
@Override
public Observable<Pair<String, List<Stop>>> call(final String query) {
return StopProvider.getStopResultObservable(getContentResolver(), query)
.map(new Func1<List<Stop>, Pair<String, List<Stop>>>() {
// I think this map is a bit more readable than the
// combineLatest, and since "query" should not be changing
// anyway, the result should be the same (you have to
// declare it as final in the method signature, though
@Override
public Pair<String, List<Stop>> call(List<Stop> stops) {
return new Pair(query, stops);
}
});
}
)
.concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() {
@Override
public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) {
if (queryAndStops.second.size() == 0) {
return RestClient.service().locationName(queryAndStops.first)
.map(new Func1<LocationNameResponse, List<Stop>>() {
@Override
public List<Stop> call(LocationNameResponse locationNameResponse) {
// since there was no if-else in your original code (you were always
// just wrapping the List in an Observable) I removed that, too
return locationNameResponse.getAddresses();
}
});
} else {
return Observable.just(queryAndStops.second);
}
}
)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(this.<List<Stop>>bindToLifecycle())
.subscribe(new Action1<List<Stop>>() {
@Override
public void call(List<Stop> stops) {
// since I don't know what your API is returning I think
// it's saver to keep this check in:
if (stops != null) {
searchStopResultFragment.showStops(stops);
} else {
searchStopResultFragment.showStartItems();
}
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
showError(throwable);
}
});
其中:
public static final Func1<EditText, String> EXTRACT_STRING = new Func1<EditText, String>() {
@Override
public void String call(EditText editText) {
return editText.getText().toString();
}
};
public static final Func1<String, Boolean> STRING_IS_NOT_EMPTY = new Func1<String, Boolean>() {
@Override
public void String call(String string) {
return !string.isEmpty();
}
};
因此,这至少消除了 return Observable.just(null)
然后在链中检查它的需要。
我是 rx-java 和 rx-android 的完全初学者。我听说一开始学习曲线很陡峭。
我正在尝试使用 rx-android.
将所有基于 Eventbus 的代码替换为类型更安全的替代方案我已设置此代码段以根据编辑文本文本更改事件创建可观察对象:
MainActivity
RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).subscribe(new Action1<EditText>() {
@Override
public void call(EditText editText) {
searchStopResultFragment.query(editText.getText().toString());
}
});
RxUtils:
public static Observable<EditText> createEditTextChangeObservable(final EditText editText){
return Observable.create(new Observable.OnSubscribe<EditText>() {
@Override
public void call(final Subscriber<? super EditText> subscriber) {
editText.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
if (subscriber.isUnsubscribed()) return;
subscriber.onNext(editText);
}
});
}
});
}
SearchStopResultFragment:
public void query(String query){
lastQuery = query;
resultObservable = StopProvider.getStopResultObservable(getActivity().getContentResolver(),query);
subscription = resultObservable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Stop>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Stop> stops) {
if(!lastQuery.equals("")) {
if(stops.size()>0) {
ArrayList<AdapterItem> items = adapter.getItems();
items.clear();
for (Stop stop : stops) {
SearchResultStopItem item = new SearchResultStopItem(stop, SearchResultStopItem.STOP);
items.add(item);
}
adapter.setItems(items);
adapter.notifyDataSetChanged();
}else{
//DO A NOTHER ASYNC QUERY TO FETCH RESULTS
}
}else{
showStartItems();
}
}
});
}
感觉我做错了。我在每个文本更改事件中从片段中的查询方法创建新的可观察对象。我还想根据 StopProvider.getStopResultObservable
中的结果创建一个新的异步查找操作(参见评论)
有什么想法吗?
使用 concatmap 解决它并结合最新的:
RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() {
@Override
public Observable<Pair<String, List<Stop>>> call(EditText editText) {
String query = editText.getText().toString();
//searchStopResultFragment.setLastQuery(query);
if(query.isEmpty()){
return Observable.just(null);
}
return Observable.combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() {
@Override
public Pair<String, List<Stop>> call(List<Stop> stops, String s) {
return new Pair(s,stops);
}
});
}
}).concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() {
@Override
public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) {
if(queryAndStops!=null) {
if (queryAndStops.second.size() == 0) {
return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() {
@Override
public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) {
return Observable.just(locationNameResponse.getAddresses());
}
});
} else {
return Observable.just(queryAndStops.second);
}
}
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle()).subscribe(new Action1<List<Stop>>() {
@Override
public void call(List<Stop> stops) {
if (stops != null) {
searchStopResultFragment.showStops(stops);
}else{
searchStopResultFragment.showStartItems();
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
showError(throwable);
}
});
然而,有没有更好的方法可以在不发送 Observable.just(null) 的情况下跳出链并在下一次调用时检查空值?
您可以将第二个 concatMap 移动到您唯一需要它的地方 - 在 combineLatest 之后
RxUtils.createEditTextChangeObservable(txtInput)
.throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.concatMap(new Func1<EditText, Observable<Pair<String, List<Stop>>>>() {
@Override
public Observable<Pair<String, List<Stop>>> call(EditText editText) {
String query = editText.getText().toString();
//searchStopResultFragment.setLastQuery(query);
if (query.isEmpty()) {
return Observable.just(null);
}
return Observable
.combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() {
@Override
public Pair<String, List<Stop>> call(List<Stop> stops, String s) {
return new Pair(s, stops);
}
})
.concatMap(new Func1<R, Observable<? extends Pair<String, List<Stop>>>>() {
@Override
public Observable<? extends Pair<String, List<Stop>>> call(R r) {
if (queryAndStops.second.size() == 0) {
return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() {
@Override
public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) {
return Observable.just(locationNameResponse.getAddresses());
}
});
} else {
return Observable.just(queryAndStops.second);
}
}
});
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle())
.subscribe(new Action1<List<Stop>>() {
@Override
public void call(List<Stop> stops) {
if (stops != null) {
searchStopResultFragment.showStops(stops);
} else {
searchStopResultFragment.showStartItems();
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
showError(throwable);
}
});
这是我想出的:
RxUtils.createEditTextChangeObservable(txtInput)
.throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.map(EXTRACT_STRING)
.filter(STRING_IS_NOT_EMPTY)
.concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() {
@Override
public Observable<Pair<String, List<Stop>>> call(final String query) {
return StopProvider.getStopResultObservable(getContentResolver(), query)
.map(new Func1<List<Stop>, Pair<String, List<Stop>>>() {
// I think this map is a bit more readable than the
// combineLatest, and since "query" should not be changing
// anyway, the result should be the same (you have to
// declare it as final in the method signature, though
@Override
public Pair<String, List<Stop>> call(List<Stop> stops) {
return new Pair(query, stops);
}
});
}
)
.concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() {
@Override
public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) {
if (queryAndStops.second.size() == 0) {
return RestClient.service().locationName(queryAndStops.first)
.map(new Func1<LocationNameResponse, List<Stop>>() {
@Override
public List<Stop> call(LocationNameResponse locationNameResponse) {
// since there was no if-else in your original code (you were always
// just wrapping the List in an Observable) I removed that, too
return locationNameResponse.getAddresses();
}
});
} else {
return Observable.just(queryAndStops.second);
}
}
)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(this.<List<Stop>>bindToLifecycle())
.subscribe(new Action1<List<Stop>>() {
@Override
public void call(List<Stop> stops) {
// since I don't know what your API is returning I think
// it's saver to keep this check in:
if (stops != null) {
searchStopResultFragment.showStops(stops);
} else {
searchStopResultFragment.showStartItems();
}
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
showError(throwable);
}
});
其中:
public static final Func1<EditText, String> EXTRACT_STRING = new Func1<EditText, String>() {
@Override
public void String call(EditText editText) {
return editText.getText().toString();
}
};
public static final Func1<String, Boolean> STRING_IS_NOT_EMPTY = new Func1<String, Boolean>() {
@Override
public void String call(String string) {
return !string.isEmpty();
}
};
因此,这至少消除了 return Observable.just(null)
然后在链中检查它的需要。