Rx concatWith() return 只有第一个 Flowable 结果

Rx concatWith() return only first Flowable result

我已经发布了它们单独工作的所有方法,但我遇到了第一个问题,其中我 concatWith() 两个可流动对象

    return userFavouriteStores()
    .concatWith(userOtherStores())
   .doOnNext(new Consumer<List<StoreModel>>() {
      @Override
      public void accept(@io.reactivex.annotations.NonNull List<StoreModel> storeModels) throws Exception {
        Log.i("storeModels", "" + storeModels);
      }
    })


public Flowable<List<StoreModel>> userFavouriteStores() {
return userStores()
    .map(UserStores::favoriteStores)
    .flatMap(storeId -> storeDao.storesWithIds(storeId))
    .map(stores -> { // TODO Konvert to Kotlin map {}
      List<StoreModel> result = new ArrayList<>(stores.size());
      for (se.ica.handla.repositories.local.Store store : stores) {
        result.add(store.toStoreModel(StoreModel.Source.Favourite));
      }
      return result;
    });  }

public Flowable<List<StoreModel>> userOtherStores() {
return userStores().map(UserStores::otherStores)
    .flatMap(storeId -> storeDao.storesWithIds(storeId))
    .map(stores -> {
      List<StoreModel> result = new ArrayList<>(stores.size());
      for (Store store : stores) {
        result.add(store.toStoreModel(StoreModel.Source.Other));
      }
      return result;
    });}

更新方法:userStores() 用于最喜欢的商店和其他商店,

  private Flowable<UserStores> userStores() {
return apiIcaSeResource
    .userStores()
    .toFlowable(); }

  @GET("user/stores")
Single<UserStores> userStores();

在跟进评论和其他信息之后,您对 concat() 没有特别的问题,我假设它是有效的,它只是不是您想要实现的工具这里。

concat() 不会将两个列表连接成一个列表,但 rathe 将首先发出第一个 Flowable 的所有项目,然后才发出第二个 Flowable 的项目(因此你必须有 onComplete 所以 concat 会知道什么时候 Flowable 结束,我一开始就问了。

为了将列表组合在一起,我建议压缩两个存储 Obesrvables(收藏夹/其他),然后简单地组合到列表以获得组合列表的单个输出。
除此之外,正如您所指出的,由于两个商店 Observables 来自 userStores(),您将调用网络请求两次,这绝对没有必要。您可以使用 publish() 来解决它,这会将网络结果共享并多播到两个 Observable,从而产生单个网络请求。

总而言之,我宁愿推荐在这里使用 Single,而不是 Flowable,因为您没有背压奉献。类似于以下实现:

Observable<List<StoreModel>> publish = userStores()
        .toObservable()
        .publish(userStores ->
                Single.zip(
                    userFavouriteStores(userStores.singleOrError()),
                    userOtherStores(userStores.singleOrError()),
                    (favoriteStores, otherStores) -> {
                         favoriteStores.addAll(otherStores);
                         return favoriteStores;
                    }
                )
                    .toObservable()
        );