RxJava:如何重置长 运行 热可观察链?
RxJava: How can I reset a long running hot observable chain?
对于我的应用程序的搜索功能,我有一个执行以下操作的热可观察链。
- 接受用户输入的字符串到
EditText
(TextChangedEvent
)(在 mainThread
)
- 去抖动 300 毫秒(在
computation
线程上)
- 显示加载微调器(
mainThread
)
- 使用该字符串查询 SQL 数据库(此查询可能需要 100 毫秒到 2000 毫秒)(在
Schedulers.io()
)
- 向用户显示结果(
mainThread
)
由于第 3 步的长度变化很大,因此会出现竞争条件,即较新的搜索结果显示在较新的结果之上(有时)。假设用户想要键入 chicken
,但由于奇怪的键入速度,单词的第一部分在整个术语之前发出:
- 首先发送对
chick
的搜索,然后发送 chicken
。
- 说
chick
需要 1500ms
来执行,而 chicken
需要 300ms
来执行。
- 这会导致
chick
搜索结果无法正确显示搜索词 chicken
。这是因为 chicken
搜索先完成(仅用了 300 毫秒),然后是 chick
搜索(1500 毫秒)。
我该如何处理这种情况?
- 一旦用户通过
TextChangedEvent
触发新搜索,我就不关心旧搜索,即使它仍然是 运行。有什么办法可以取消旧的搜索吗?
完整的可观察代码:
subscription = WidgetObservable.text(searchText)
.debounce(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
//do this on main thread because it's a UI element (cannot access a View from a background thread)
//get a String representing the new text entered in the EditText
.map(new Func1<OnTextChangeEvent, String>() {
@Override
public String call(OnTextChangeEvent onTextChangeEvent) {
return onTextChangeEvent.text().toString().trim();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
presenter.handleInput(s);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null && s.length() >= 1 && !s.equals("");
}
}).doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Timber.d("searching for string: '%s'", s);
}
})
//run SQL query and get a cursor for all the possible search results with the entered search term
.flatMap(new Func1<String, Observable<SearchBookmarkableAdapterViewModel>>() {
@Override
public Observable<SearchBookmarkableAdapterViewModel> call(String s) {
return presenter.getAdapterViewModelRx(s);
}
})
.subscribeOn(Schedulers.io())
//have the subscriber (the adapter) run on the main thread
.observeOn(AndroidSchedulers.mainThread())
//subscribe the adapter, which receives a stream containing a list of my search result objects and populates the view with them
.subscribe(new Subscriber<SearchBookmarkableAdapterViewModel>() {
@Override
public void onCompleted() {
Timber.v("Completed loading results");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error loading results");
presenter.onNoResults();
//resubscribe so the observable keeps working.
subscribeSearchText();
}
@Override
public void onNext(SearchBookmarkableAdapterViewModel searchBookmarkableAdapterViewModel) {
Timber.v("Loading data with size: %d into adapter", searchBookmarkableAdapterViewModel.getSize());
adapter.loadDataIntoAdapter(searchBookmarkableAdapterViewModel);
final int resultCount = searchBookmarkableAdapterViewModel.getSize();
if (resultCount == 0)
presenter.onNoResults();
else
presenter.onResults();
}
});
使用 switchMap 而不是 flatMap
。这将导致它在您开始新查询时丢弃*先前的查询。
*这是如何工作的:
每当外部源 observable 产生新值时,switchMap
调用您的选择器以 return 一个新的内部 observable(在本例中为 presenter.getAdapterViewModelRx(s)
)。 switchMap
然后 取消订阅 从它正在监听的前一个内部 Observable 并 订阅 到新的。
取消订阅之前的内部可观察对象有两个影响:
Observable 产生的任何通知(值、完成、错误等)都将被默默地忽略并丢弃。
Observable 将被通知其观察者已取消订阅,并且可以选择采取措施取消它所代表的任何异步进程。
您放弃的查询是否真的被取消完全取决于 presenter.getAdapterViewModelRx()
的实施。理想情况下,它们将被取消以避免不必要地浪费服务器资源。但即使他们保留 运行,上面的 #1 也会阻止您的预输入代码看到陈旧的结果。
对于我的应用程序的搜索功能,我有一个执行以下操作的热可观察链。
- 接受用户输入的字符串到
EditText
(TextChangedEvent
)(在mainThread
) - 去抖动 300 毫秒(在
computation
线程上) - 显示加载微调器(
mainThread
) - 使用该字符串查询 SQL 数据库(此查询可能需要 100 毫秒到 2000 毫秒)(在
Schedulers.io()
) - 向用户显示结果(
mainThread
)
由于第 3 步的长度变化很大,因此会出现竞争条件,即较新的搜索结果显示在较新的结果之上(有时)。假设用户想要键入 chicken
,但由于奇怪的键入速度,单词的第一部分在整个术语之前发出:
- 首先发送对
chick
的搜索,然后发送chicken
。 - 说
chick
需要1500ms
来执行,而chicken
需要300ms
来执行。 - 这会导致
chick
搜索结果无法正确显示搜索词chicken
。这是因为chicken
搜索先完成(仅用了 300 毫秒),然后是chick
搜索(1500 毫秒)。
我该如何处理这种情况?
- 一旦用户通过
TextChangedEvent
触发新搜索,我就不关心旧搜索,即使它仍然是 运行。有什么办法可以取消旧的搜索吗?
完整的可观察代码:
subscription = WidgetObservable.text(searchText)
.debounce(300, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
//do this on main thread because it's a UI element (cannot access a View from a background thread)
//get a String representing the new text entered in the EditText
.map(new Func1<OnTextChangeEvent, String>() {
@Override
public String call(OnTextChangeEvent onTextChangeEvent) {
return onTextChangeEvent.text().toString().trim();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
presenter.handleInput(s);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null && s.length() >= 1 && !s.equals("");
}
}).doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Timber.d("searching for string: '%s'", s);
}
})
//run SQL query and get a cursor for all the possible search results with the entered search term
.flatMap(new Func1<String, Observable<SearchBookmarkableAdapterViewModel>>() {
@Override
public Observable<SearchBookmarkableAdapterViewModel> call(String s) {
return presenter.getAdapterViewModelRx(s);
}
})
.subscribeOn(Schedulers.io())
//have the subscriber (the adapter) run on the main thread
.observeOn(AndroidSchedulers.mainThread())
//subscribe the adapter, which receives a stream containing a list of my search result objects and populates the view with them
.subscribe(new Subscriber<SearchBookmarkableAdapterViewModel>() {
@Override
public void onCompleted() {
Timber.v("Completed loading results");
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error loading results");
presenter.onNoResults();
//resubscribe so the observable keeps working.
subscribeSearchText();
}
@Override
public void onNext(SearchBookmarkableAdapterViewModel searchBookmarkableAdapterViewModel) {
Timber.v("Loading data with size: %d into adapter", searchBookmarkableAdapterViewModel.getSize());
adapter.loadDataIntoAdapter(searchBookmarkableAdapterViewModel);
final int resultCount = searchBookmarkableAdapterViewModel.getSize();
if (resultCount == 0)
presenter.onNoResults();
else
presenter.onResults();
}
});
使用 switchMap 而不是 flatMap
。这将导致它在您开始新查询时丢弃*先前的查询。
*这是如何工作的:
每当外部源 observable 产生新值时,switchMap
调用您的选择器以 return 一个新的内部 observable(在本例中为 presenter.getAdapterViewModelRx(s)
)。 switchMap
然后 取消订阅 从它正在监听的前一个内部 Observable 并 订阅 到新的。
取消订阅之前的内部可观察对象有两个影响:
Observable 产生的任何通知(值、完成、错误等)都将被默默地忽略并丢弃。
Observable 将被通知其观察者已取消订阅,并且可以选择采取措施取消它所代表的任何异步进程。
您放弃的查询是否真的被取消完全取决于 presenter.getAdapterViewModelRx()
的实施。理想情况下,它们将被取消以避免不必要地浪费服务器资源。但即使他们保留 运行,上面的 #1 也会阻止您的预输入代码看到陈旧的结果。