将 Observable<T[]> 缩减为 T[]

Reduce Observable<T[]> into T[]

如果我正在使用 Observable<number[]> 并且我想将所有排放物收集到一个 number[] 中。

我一直都是这样做的:

const items$ = Rx.Observable.of([10, 20, 30], [20,30,40])
  .reduce((acc, curr) => acc.concat(curr));

let itemsArray = [];

items.subscribe(arr => itemsArray = arr);

console.log(itemsArray); // [10, 20, 30, 20, 30, 40]

我只是想知道是否有任何方法可以从 Observable 中“收集”项目 - 在 RxJS 中内联而不调用 observable 上的订阅。

这会建议一种同步方法,鉴于可观察对象的异步性质,我不确定这种方法是否可行。

类似于:

const itemsArray = Rx.Observable.of([10, 20, 30], [20,30,40])
  .reduce((acc, curr) => acc.concat(curr))
  .collect( /* return a number[]  - not an Observable<number[]> */);


console.log(itemsArray); // [10, 20, 30, 20, 30, 40]

不订阅就无法完成。
这是一个流(冷),您需要订阅才能接收值。

就是说,我想你想要的是 而不是 等到最后一个值发出,然后随着时间的推移接收新的串联数组(这是完全可能的) .

为此使用的运算符是:scan.
事实上,您离 reduce 不远了 ;) !!

const items$ = Rx
  .Observable
  .of([10, 20, 30], [20,30,40])
  .scan((acc, curr) => acc.concat(curr), []);


items$.subscribe(arr => console.log(arr));

https://plnkr.co/edit/2e8eR6RrHrrE1WOuRRdf?p=preview

+1 Maxime 的回答。

为了完整起见,如果 ObservableScalarObservable,您可以找到 value 属性。

但这不是文档的一部分 API 所以考虑到可维护性,这不是一件好事。