从 RxJS promise 流

Stream from RxJS promise

我正在尝试 运行 在我根据承诺创建的流上 reduce

在下面的示例中,myApi.getItemsAsync() returns 一个数组。

我希望 reduce 回调会针对数组中的每个单独项目调用。相反,它被整个数组调用。

Rx.Observable.fromPromise(
  myApi.getItemsAsync()
)
.reduce(function (acc, item) {
  // expecting `item` to be a single item
  // instead, seeing the entire array
}, [])
.subscribe(function (result) { 
  console.log(result);
});

如果 myApi.getItemsAsync() 是一个同步函数,其中 returns 一个数组,reduce 按预期工作,调用数组中每个项目的回调。

我怎样才能让它与承诺一起工作?

reduce 正在处理整个流,而不是作为其一部分发出的数组。

如果您想使用数组方法,那么应该是

Rx.Observable.fromPromise(myApi.getItemsAsync())
.map(function(array) {
  return array.reduce(function (acc, item) {
    return …;
  }, []);
})
…

如果您希望使用流方法,则需要先将数组项注入流中。我想在您的同步版本中您使用了 fromArray。连同承诺,你会做

Rx.Observable.fromPromise(myApi.getItemsAsync())
.flatMap(function(array) {
  return Rx.Observable.fromArray(array);
}).reduce(function (acc, item) {
  return …;
}, [])
…