多次订阅 RxJava observable
Subscribe on RxJava observable multiple times
我有一个关于 RxJava Observable 的问题。
例如,我有一个 Retrofit 接口,returns 我是 Observable 的。我需要对这个视频流做点什么。这是下载视频并将其保存到列表中的代码:
API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
API - 改装支架
如果想更新视频列表,我应该做同样的操作吗?或者我应该订阅,取消订阅然后再次订阅这个:
Subscription s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
//some code
...
s.unsubscribe();
s = null;
s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
你的整个事件流程是错误的,你应该在 subscriber 中更新 UI,而不是在 doOnNext 中。
API.getVideoListObservable()
.map(r -> r.getObjects())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(videos -> {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}, throwable -> {
// handle errors
throwable.printStackTrace();
});
Retrofit 为您设置 subscribeOn(Schedulers.io())
。
将所有内容移动到一个函数中,并在需要更新列表时调用它。
不需要手动退订,因为订阅者在收到onCompleted或onError时会自动退订。
请注意,在 Android 上使用 RxJava 时,您必须处理 Activity/Fragment 生命周期和配置更改。您必须跟踪未决的网络呼叫,并在用户导航到另一个 Activity/Fragment.
时手动取消订阅
到目前为止,我解决所有 RxJava + Android 问题的首选方法是使用 https://github.com/evant/rxloader 库。使用 RxLoader 的代码将如下所示:
private Observable<List<Video>> getVideos() {
return API.getVideoListObservable()
.map(r -> r.getObjects());
}
// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
getVideos().observeOn(AndroidSchedulers.mainThread()),
new RxLoaderObserver<List<Video>>() {
@Override
public void onNext(List<Video> videos) {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}
@Override
public void onError(Throwable throwable) {
// handle errors
throwable.printStackTrace();
}
});
// whenever you need to update videos
videoLoader.restart();
我有一个关于 RxJava Observable 的问题。 例如,我有一个 Retrofit 接口,returns 我是 Observable 的。我需要对这个视频流做点什么。这是下载视频并将其保存到列表中的代码:
API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
API - 改装支架
如果想更新视频列表,我应该做同样的操作吗?或者我应该订阅,取消订阅然后再次订阅这个:
Subscription s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
//some code
...
s.unsubscribe();
s = null;
s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
你的整个事件流程是错误的,你应该在 subscriber 中更新 UI,而不是在 doOnNext 中。
API.getVideoListObservable()
.map(r -> r.getObjects())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(videos -> {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}, throwable -> {
// handle errors
throwable.printStackTrace();
});
Retrofit 为您设置 subscribeOn(Schedulers.io())
。
将所有内容移动到一个函数中,并在需要更新列表时调用它。 不需要手动退订,因为订阅者在收到onCompleted或onError时会自动退订。
请注意,在 Android 上使用 RxJava 时,您必须处理 Activity/Fragment 生命周期和配置更改。您必须跟踪未决的网络呼叫,并在用户导航到另一个 Activity/Fragment.
时手动取消订阅到目前为止,我解决所有 RxJava + Android 问题的首选方法是使用 https://github.com/evant/rxloader 库。使用 RxLoader 的代码将如下所示:
private Observable<List<Video>> getVideos() {
return API.getVideoListObservable()
.map(r -> r.getObjects());
}
// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
getVideos().observeOn(AndroidSchedulers.mainThread()),
new RxLoaderObserver<List<Video>>() {
@Override
public void onNext(List<Video> videos) {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}
@Override
public void onError(Throwable throwable) {
// handle errors
throwable.printStackTrace();
}
});
// whenever you need to update videos
videoLoader.restart();