使用 RxJS 将 Observable[A list] 转换为 Observable[B list]

Use RxJS to convert Observable[A list] to Observable[B list]

我想在JavaScript中找到申请Observable[A]的遍历函数。一、言外之意:

我有一个 Observable[A 列表],我想对列表中的每个项目并行应用一个函数 (A -> Observable[B]),并将它们全部加入一个 Observable[B 列表] ] 然后我就可以消费了。

或者在类型签名中,我想找到 this function: traverse : Observable[A list] -> (A -> Observable[B]) -> Observable[B list]。怎么做?

例如,您可以使用 source.flatMap(function(a_list){ return Rx.Observable.zip(a_list.map(f))}))

source :: Observable List A
a_list :: List A
f :: A -> Observable B
a_list.map(f) :: List Observable B
Rx.Observable.zip :: List Observable B -> Observable List B
flatMap :: Observable List A -> (List A -> Observable List B) -> Observable List B

Rx.Observable.combineLatest 应该与 zip 相同。 flatMap 可以替换为 Rx.Observable.flatMapLatest 如果它首先存在(仅存在于实例级别),但可以替换为 Rx.Observable.concatMap 如果你想尊重一些顺序。

所以您可以匹配所需的签名,但这是您想要的逻辑吗?只要 List Observable B 中的一个可观察对象完成,Zip 就会完成。 combineLatest 将为 List Observable B 中的所有可观察值重复先前发出的值,一个除外。这些中的任何一个都适用于您的用例吗?您还可以使用 zip 设计一个版本,该版本在列表中的所有可观察对象完成时完成:cf。 (去掉最后一行.concatMap(function (list) { return fromArray(list); }))。

比照。 http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html 用于组合序列的运算符。

这就是我最终解决它的方法,我很可能会作为 'traverse' 向 RxJS PR 并获得更多反馈:

Rx.Observable.prototype.traverse = function(f) {
  return this.
    // await the value O[b list]
    concatMap(xs =>
      Rx.Observable.from(xs). // lift into Rx monad
        concatMap(x => f(x)). // flatten to O[b] in same order as xs
        toArray()); // map from O[b] to O[b list]
};

const myStream = // O[A list] -> O[B list]
  filesSubj.traverse(file => fileReader(file).asDataURL())