RxJava - 将多个 Singles 和 return 的结果合并为一个

RxJava - Combining the results of multiple Singles and return as one

我遇到了以下问题: 我有一个 returns 对象列表的 Single,在每个对象内部,有一个也是 Single 的字段,并且 returns 一个项目列表(这是我需要的),但是我还需要将此列表作为一个结果返回,而不是作为每个单独的单个结果返回..

我有一个可以使用的 Observable,它包装了所有这些代码,所以我想在 onNext 中发出结果。

另外,我还需要将每个Single的单独结果添加到缓存中,key为父对象

代码:

object.getListOfObjects().doOnSuccess { objectList ->
                      objectList.map { singleItemInList ->
                          singleItemInList.listOfValues.subscribe({ valueList ->
                              cache[singleItemInList] = valueList
                              emitter.onNext(valueList)
                            // ^ this is the bit where i need to get all values instead of 
                            //   emitting on next for each item
                          },{
                              emitter.onError(it)
                          })
                      }
                  }.subscribe( {
                   /* no-op*/

                  }, {
                      emitter.onError(it)
                  })

如有任何帮助,我们将不胜感激!!

提前致谢:)

编辑 - 额外细节:

fun getListOfObjects(): Single<List<Object>> =
        Single.zip(
            allObjects().map { it.objects() }
        ) { t: Array<Any> ->
            t.map { it as Object }
        }

Object {
    [...]
    val listOfValues: Single<ValueList>
    [...]
}

在这种情况下和最后一步使用 flatMap 运算符 toList

object.getListOfObjects()
    .flatMap { objectList -> objectList.listOfValues() }
    .toList()
    .subscribe({ result -> TODO() }, { error -> TODO() })

我找到了一种方法,但它远非漂亮...任何关于如何更漂亮地实现它的建议将不胜感激! :D

object.getListOfObjects().doOnSuccess { objectList ->
                    val list = objectList.map { singleItemInList ->
                              singleItemInList.listOfValues.subscribe({ valueList ->
                                  cache[singleItemInList] = valueList
                              },{
                                  emitter.onError(it)
                              })
                          }
                      }.flatten().map {
                        it.toObservable()
                      }
                    Observable.zip(list) { itemList ->
                      val compoundList = mutableListOf<Value>()
                      for (i in itemList.indices) {
                        val singleList = itemList[i] as? List<Value>
                        singleList?.map { item ->
                            compoundList.add(item)
                        }
                    }
                    emitter.onNext(compoundList) 
                    // ^ this now emits the entire list instead of 
                    // each sublist individually
                    emitter.onComplete()
                }.subscribe({
                   /* no-op */
                }, {
                   emitter.onError(it)
                })
            }.subscribe({
                /* no-op */
            }, {
               emitter.onError(it)
            })