在 RxJava 中使用 ZIP WITH 运算符后,订阅方法的 onNext 不发出项目?
onNext of the Subscribe method not emitting items after using the ZIP WITH operator in RxJava?
主要 POJO:
class VideoResponse{
List<VideoFiles> videosFiles;
}
我有以下情况,我将两个数据库操作的结果和 return 合并为 Observable(List(VideoResponse)) .
##更新##
mDbHelper
===/* getVideoCategory() returns Observable<List<VideoResponse>> */=========
.getVideoCategory()
.flatMapIterable(videoResponses -> videoResponses)
.doOnNext(videoResponse -> {
Timber.d("Flatmap Iterable Thread == > %s", Thread.currentThread().getName());})
.concatMap((Function<VideoResponse, ObservableSource<VideoResponse>>) videoResponse -> {
Integer videoId = videoResponse.getId();
return Observable.fromCallable(() -> videoResponse)
.doOnNext(videoResponse1 -> {
Timber.d("Thread before ZIP WITH ===>
%s", Thread.currentThread().getName());
})
===/* getVideoFileList(int videoId) returns Observable<List<VideoFiles>> */====
.zipWith(getVideoFilesList(videoId)),
(videoResponse1, videoFiles) -> {
videoResponse1.setFiles(videoFiles);
return videoResponse1;
})
.doOnNext(vResponse -> {
Timber.d("Thread After ZIP WITH ===>
%s",Thread.currentThread().getName());
})
======= /*This Gets printed*/ ======================
.doOnComplete(()->{
Timber.d(" OnComplete Thread for Video Files ===> %s ",Thread.currentThread().getName());
});
})
.toList()
.toObservable()
===/* Below Print statement is not getting Executed */=================
.doOnComplete(()->{
Timber.d(" Thread doOnComplete");
})
.doOnNext(videoResponses -> {
Timber.d("Thread after loading from the LOCAL DB ====> %s", Thread.currentThread().getName());
});
以下是正在执行的调度程序线程:
Flatmap Iterable Thread == > RxCachedThreadScheduler-1
Thread before ZIP WITH ===> RxCachedThreadScheduler-1
Flatmap Iterable Thread == > RxCachedThreadScheduler-1
Thread After ZIP WITH ===> RxCachedThreadScheduler-2
Thread before ZIP WITH ===> RxCachedThreadScheduler-2
Thread After ZIP WITH ===> RxCachedThreadScheduler-2
最后的onNext永远不会得到executed.I需要return在OnNext中的List。
我已经将 observeOn 放在不同的位置,似乎没有任何效果..!!任何建议..
##更新##
使用 SqlBrite,
@Override
public Observable<List<VideoResponse>> getVideoCategory() {
return mDBHelper
.createQuery(VideoEntry.TABLE_NAME,
DbUtils.getSelectAllQuery(VideoEntry.TABLE_NAME))
.mapToOne(DbUtils::videosFromCursor);
@Override
public Observable<List<VideoFiles>> getVideoFilesList(int videoId) {
return mDBHelper.createQuery(VideoDetailsEntry.TABLE_NAME,
DbUtils.getSelectFromId(VideoDetailsEntry.TABLE_NAME,VideoDetailsEntry.COLUMN_VIDEO_ID),
String.valueOf(videoId))
.mapToOne(DbUtils::videoDetailsFromCursor);
}
正如@BobDalgleish 所暗示的,OnComplete 在 toList 之前被调用,而不是在 ZipWith 之后被调用。现在我做了以下更改,我从 db 获得完整列表。我已经使用 concatMap 来保存订单并等待完成。
mLocalDataSource
.getVideoCategory()
.compose(RxUtils.applySchedulers())
.flatMap(new Function<List<VideoResponse>,
ObservableSource<List<VideoResponse>>>() {
@Override
public ObservableSource<List<VideoResponse>> apply(List<VideoResponse> videoResponses) throws Exception {
return Observable.just(videoResponses)
.concatMap(videoResponses1 -> Observable.fromIterable(videoResponses1)
.concatMap(videoResponse -> Observable.just(videoResponse)
.concatMap(videoResponse1 -> {
Integer videoId = videoResponse1.getId();
return Observable.just(videoResponse1)
.zipWith(getVideoFilesList(videoId), new BiFunction<VideoResponse, List<VideoFiles>, VideoResponse>() {
@Override
public VideoResponse apply(VideoResponse videoResponse1, List<VideoFiles> videoFiles) throws Exception {
videoResponse1.setFiles(videoFiles);
Timber.d("Video Responses == > %s",videoResponse1);
return videoResponse1;
}
});
})))
.toList()
.toObservable()
.observeOn(AndroidSchedulers.mainThread());
}
});
我知道这看起来有点乱!!.任何建议或优化,请 post..!!
主要 POJO:
class VideoResponse{
List<VideoFiles> videosFiles;
}
我有以下情况,我将两个数据库操作的结果和 return 合并为 Observable(List(VideoResponse)) .
##更新##
mDbHelper
===/* getVideoCategory() returns Observable<List<VideoResponse>> */=========
.getVideoCategory()
.flatMapIterable(videoResponses -> videoResponses)
.doOnNext(videoResponse -> {
Timber.d("Flatmap Iterable Thread == > %s", Thread.currentThread().getName());})
.concatMap((Function<VideoResponse, ObservableSource<VideoResponse>>) videoResponse -> {
Integer videoId = videoResponse.getId();
return Observable.fromCallable(() -> videoResponse)
.doOnNext(videoResponse1 -> {
Timber.d("Thread before ZIP WITH ===>
%s", Thread.currentThread().getName());
})
===/* getVideoFileList(int videoId) returns Observable<List<VideoFiles>> */====
.zipWith(getVideoFilesList(videoId)),
(videoResponse1, videoFiles) -> {
videoResponse1.setFiles(videoFiles);
return videoResponse1;
})
.doOnNext(vResponse -> {
Timber.d("Thread After ZIP WITH ===>
%s",Thread.currentThread().getName());
})
======= /*This Gets printed*/ ======================
.doOnComplete(()->{
Timber.d(" OnComplete Thread for Video Files ===> %s ",Thread.currentThread().getName());
});
})
.toList()
.toObservable()
===/* Below Print statement is not getting Executed */=================
.doOnComplete(()->{
Timber.d(" Thread doOnComplete");
})
.doOnNext(videoResponses -> {
Timber.d("Thread after loading from the LOCAL DB ====> %s", Thread.currentThread().getName());
});
以下是正在执行的调度程序线程:
Flatmap Iterable Thread == > RxCachedThreadScheduler-1
Thread before ZIP WITH ===> RxCachedThreadScheduler-1
Flatmap Iterable Thread == > RxCachedThreadScheduler-1
Thread After ZIP WITH ===> RxCachedThreadScheduler-2
Thread before ZIP WITH ===> RxCachedThreadScheduler-2
Thread After ZIP WITH ===> RxCachedThreadScheduler-2
最后的onNext永远不会得到executed.I需要return在OnNext中的List。 我已经将 observeOn 放在不同的位置,似乎没有任何效果..!!任何建议..
##更新## 使用 SqlBrite,
@Override
public Observable<List<VideoResponse>> getVideoCategory() {
return mDBHelper
.createQuery(VideoEntry.TABLE_NAME,
DbUtils.getSelectAllQuery(VideoEntry.TABLE_NAME))
.mapToOne(DbUtils::videosFromCursor);
@Override
public Observable<List<VideoFiles>> getVideoFilesList(int videoId) {
return mDBHelper.createQuery(VideoDetailsEntry.TABLE_NAME,
DbUtils.getSelectFromId(VideoDetailsEntry.TABLE_NAME,VideoDetailsEntry.COLUMN_VIDEO_ID),
String.valueOf(videoId))
.mapToOne(DbUtils::videoDetailsFromCursor);
}
正如@BobDalgleish 所暗示的,OnComplete 在 toList 之前被调用,而不是在 ZipWith 之后被调用。现在我做了以下更改,我从 db 获得完整列表。我已经使用 concatMap 来保存订单并等待完成。
mLocalDataSource
.getVideoCategory()
.compose(RxUtils.applySchedulers())
.flatMap(new Function<List<VideoResponse>,
ObservableSource<List<VideoResponse>>>() {
@Override
public ObservableSource<List<VideoResponse>> apply(List<VideoResponse> videoResponses) throws Exception {
return Observable.just(videoResponses)
.concatMap(videoResponses1 -> Observable.fromIterable(videoResponses1)
.concatMap(videoResponse -> Observable.just(videoResponse)
.concatMap(videoResponse1 -> {
Integer videoId = videoResponse1.getId();
return Observable.just(videoResponse1)
.zipWith(getVideoFilesList(videoId), new BiFunction<VideoResponse, List<VideoFiles>, VideoResponse>() {
@Override
public VideoResponse apply(VideoResponse videoResponse1, List<VideoFiles> videoFiles) throws Exception {
videoResponse1.setFiles(videoFiles);
Timber.d("Video Responses == > %s",videoResponse1);
return videoResponse1;
}
});
})))
.toList()
.toObservable()
.observeOn(AndroidSchedulers.mainThread());
}
});
我知道这看起来有点乱!!.任何建议或优化,请 post..!!