RxJava2 如何与其他两个观察者共享一个可观察对象,并将其合并回订阅者
RxJava2 How to share an observable with two other observer, and merge it back to a subscriber
我有以下方法
Document createDocument(String url);
List<MediaContent> getVideo(Document doc);
List<MediaContent> getImages(Document doc);
列表< MediaContent> 将由
使用
void appendToRv(List<MediaContent> media);
我喜欢使用 RxJava2
CreateDocument -> getVideo ->
-> appendToRv
-> getImages ->
(另外,视频输出应该排在图像之前)。
我该怎么做?我尝试了 flatMap,但它似乎只允许使用单一方法
Single<List<MediaContent>> single =
Single.fromCallable(() -> createDocument(url))
// . ?? ..
// this is the part i am lost with
// how do i feed document to -> getVideo() and getImage()
// and then merge them back into the subscriber
//
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
single.subscribe(parseImageSubscription);
DisposableSingleObserver
parseImageSubscription = new DisposableSingleObserver<List<MediaContent>>() {
@Override
public void onSuccess(List<MediaContent> media) {
if(media!=null) {
appendToRv(media);
}
}
@Override
public void onError(Throwable error) {
doSnackBar("error loading: '" + q + "'");
}
};
getVideos 和 getImages 的单个 observables
Single<List<MediaContent>> SingleGetImage(Document document ) {
return Single.create(e -> {
List<MediaContent> result = getImage(document);
if (result != null) {
e.onSuccess(result);
}else {
e.onError(new Exception("No images found"));
}
});
}
Single<List<MediaContent>> singleGetVideo(Document document ) {
return Single.create(e -> {
List<MediaContent> result = getVideo( document);
if (result != null) {
e.onSuccess(result);
}else {
e.onError(new Exception("No videos found"));
}
});
}
假设您想并行执行 getVideos
和 getImages
请求,您可以将 flatMap()
与 zip()
一起使用,zip 将从两个 Singles
,你可以将2个结果组合成一个新值,这意味着你可以对视频 MediaContent
列表进行排序,并将其与图像 MediaContent
列表组合,并且 return 统一列表(或您想要的任何其他对象):
Single<List<MediaContent>> single =
Single.fromCallable(() -> createDocument(url))
.flatMap(document -> Single.zip(singleGetVideo(document), SingleGetImage(document),
(videoMediaContents, imageMediaContents) -> //here you'll have the 2 results
//you can sort combine etc. and return unified object
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
single.subscribe(parseImageSubscription)
Observable.zip() 可以完美实现。观察者将通过此方法收到合并结果。
public void zip() {
Observable<Integer> observable1 = Observable.just(1);
Observable<Integer> observable2 = Observable.just(2);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer o) {
Logger.i(o.toString());
//Here will print 3.
}
});
}
我有以下方法
Document createDocument(String url);
List<MediaContent> getVideo(Document doc);
List<MediaContent> getImages(Document doc);
列表< MediaContent> 将由
使用void appendToRv(List<MediaContent> media);
我喜欢使用 RxJava2
CreateDocument -> getVideo ->
-> appendToRv
-> getImages ->
(另外,视频输出应该排在图像之前)。
我该怎么做?我尝试了 flatMap,但它似乎只允许使用单一方法
Single<List<MediaContent>> single =
Single.fromCallable(() -> createDocument(url))
// . ?? ..
// this is the part i am lost with
// how do i feed document to -> getVideo() and getImage()
// and then merge them back into the subscriber
//
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
single.subscribe(parseImageSubscription);
DisposableSingleObserver
parseImageSubscription = new DisposableSingleObserver<List<MediaContent>>() {
@Override
public void onSuccess(List<MediaContent> media) {
if(media!=null) {
appendToRv(media);
}
}
@Override
public void onError(Throwable error) {
doSnackBar("error loading: '" + q + "'");
}
};
getVideos 和 getImages 的单个 observables
Single<List<MediaContent>> SingleGetImage(Document document ) {
return Single.create(e -> {
List<MediaContent> result = getImage(document);
if (result != null) {
e.onSuccess(result);
}else {
e.onError(new Exception("No images found"));
}
});
}
Single<List<MediaContent>> singleGetVideo(Document document ) {
return Single.create(e -> {
List<MediaContent> result = getVideo( document);
if (result != null) {
e.onSuccess(result);
}else {
e.onError(new Exception("No videos found"));
}
});
}
假设您想并行执行 getVideos
和 getImages
请求,您可以将 flatMap()
与 zip()
一起使用,zip 将从两个 Singles
,你可以将2个结果组合成一个新值,这意味着你可以对视频 MediaContent
列表进行排序,并将其与图像 MediaContent
列表组合,并且 return 统一列表(或您想要的任何其他对象):
Single<List<MediaContent>> single =
Single.fromCallable(() -> createDocument(url))
.flatMap(document -> Single.zip(singleGetVideo(document), SingleGetImage(document),
(videoMediaContents, imageMediaContents) -> //here you'll have the 2 results
//you can sort combine etc. and return unified object
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
single.subscribe(parseImageSubscription)
Observable.zip() 可以完美实现。观察者将通过此方法收到合并结果。
public void zip() {
Observable<Integer> observable1 = Observable.just(1);
Observable<Integer> observable2 = Observable.just(2);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer o) {
Logger.i(o.toString());
//Here will print 3.
}
});
}