Rx concatWith() return 只有第一个 Flowable 结果
Rx concatWith() return only first Flowable result
我已经发布了它们单独工作的所有方法,但我遇到了第一个问题,其中我 concatWith() 两个可流动对象
return userFavouriteStores()
.concatWith(userOtherStores())
.doOnNext(new Consumer<List<StoreModel>>() {
@Override
public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception {
Log.i("storeModels", "" + storeModels);
}
})
public Flowable<List<StoreModel>> userFavouriteStores() {
return userStores()
.map(UserStores::favoriteStores)
.flatMap(storeId -> storeDao.storesWithIds(storeId))
.map(stores -> { // TODO Konvert to Kotlin map {}
List<StoreModel> result = new ArrayList<>(stores.size());
for (se.ica.handla.repositories.local.Store store : stores) {
result.add(store.toStoreModel(StoreModel.Source.Favourite));
}
return result;
}); }
public Flowable<List<StoreModel>> userOtherStores() {
return userStores().map(UserStores::otherStores)
.flatMap(storeId -> storeDao.storesWithIds(storeId))
.map(stores -> {
List<StoreModel> result = new ArrayList<>(stores.size());
for (Store store : stores) {
result.add(store.toStoreModel(StoreModel.Source.Other));
}
return result;
});}
更新方法:userStores() 用于最喜欢的商店和其他商店,
private Flowable<UserStores> userStores() {
return apiIcaSeResource
.userStores()
.toFlowable(); }
@GET("user/stores")
Single<UserStores> userStores();
在跟进评论和其他信息之后,您对 concat()
没有特别的问题,我假设它是有效的,它只是不是您想要实现的工具这里。
concat()
不会将两个列表连接成一个列表,但 rathe 将首先发出第一个 Flowable
的所有项目,然后才发出第二个 Flowable
的项目(因此你必须有 onComplete
所以 concat 会知道什么时候 Flowable
结束,我一开始就问了。
为了将列表组合在一起,我建议压缩两个存储 Obesrvable
s(收藏夹/其他),然后简单地组合到列表以获得组合列表的单个输出。
除此之外,正如您所指出的,由于两个商店 Observable
s 来自 userStores()
,您将调用网络请求两次,这绝对没有必要。您可以使用 publish()
来解决它,这会将网络结果共享并多播到两个 Observable
,从而产生单个网络请求。
总而言之,我宁愿推荐在这里使用 Single,而不是 Flowable
,因为您没有背压奉献。类似于以下实现:
Observable<List<StoreModel>> publish = userStores()
.toObservable()
.publish(userStores ->
Single.zip(
userFavouriteStores(userStores.singleOrError()),
userOtherStores(userStores.singleOrError()),
(favoriteStores, otherStores) -> {
favoriteStores.addAll(otherStores);
return favoriteStores;
}
)
.toObservable()
);
我已经发布了它们单独工作的所有方法,但我遇到了第一个问题,其中我 concatWith() 两个可流动对象
return userFavouriteStores()
.concatWith(userOtherStores())
.doOnNext(new Consumer<List<StoreModel>>() {
@Override
public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception {
Log.i("storeModels", "" + storeModels);
}
})
public Flowable<List<StoreModel>> userFavouriteStores() {
return userStores()
.map(UserStores::favoriteStores)
.flatMap(storeId -> storeDao.storesWithIds(storeId))
.map(stores -> { // TODO Konvert to Kotlin map {}
List<StoreModel> result = new ArrayList<>(stores.size());
for (se.ica.handla.repositories.local.Store store : stores) {
result.add(store.toStoreModel(StoreModel.Source.Favourite));
}
return result;
}); }
public Flowable<List<StoreModel>> userOtherStores() {
return userStores().map(UserStores::otherStores)
.flatMap(storeId -> storeDao.storesWithIds(storeId))
.map(stores -> {
List<StoreModel> result = new ArrayList<>(stores.size());
for (Store store : stores) {
result.add(store.toStoreModel(StoreModel.Source.Other));
}
return result;
});}
更新方法:userStores() 用于最喜欢的商店和其他商店,
private Flowable<UserStores> userStores() {
return apiIcaSeResource
.userStores()
.toFlowable(); }
@GET("user/stores")
Single<UserStores> userStores();
在跟进评论和其他信息之后,您对 concat()
没有特别的问题,我假设它是有效的,它只是不是您想要实现的工具这里。
concat()
不会将两个列表连接成一个列表,但 rathe 将首先发出第一个 Flowable
的所有项目,然后才发出第二个 Flowable
的项目(因此你必须有 onComplete
所以 concat 会知道什么时候 Flowable
结束,我一开始就问了。
为了将列表组合在一起,我建议压缩两个存储 Obesrvable
s(收藏夹/其他),然后简单地组合到列表以获得组合列表的单个输出。
除此之外,正如您所指出的,由于两个商店 Observable
s 来自 userStores()
,您将调用网络请求两次,这绝对没有必要。您可以使用 publish()
来解决它,这会将网络结果共享并多播到两个 Observable
,从而产生单个网络请求。
总而言之,我宁愿推荐在这里使用 Single,而不是 Flowable
,因为您没有背压奉献。类似于以下实现:
Observable<List<StoreModel>> publish = userStores()
.toObservable()
.publish(userStores ->
Single.zip(
userFavouriteStores(userStores.singleOrError()),
userOtherStores(userStores.singleOrError()),
(favoriteStores, otherStores) -> {
favoriteStores.addAll(otherStores);
return favoriteStores;
}
)
.toObservable()
);