RxJS:如何发出原始值,然后在完成时减少?

RxJS: How to emit original values, then reduce upon completion?

我想从 RxJS 流发出所有原始值,然后在完成后发出摘要。

Reduce 停止发射原始值。扫描发出每个总数而不是原始值。

这是我的 hacky 解决方案:

let total = {
  total: 0
};

Rx.Observable.range(1, 3)
  .do(val => {
    total.total += val;
  })
  .concat(Rx.Observable.of(total))
  .subscribe(
    value => {
      console.log('Next:', value)
    }
  );

// Next: 1
// Next: 2
// Next: 3
// Next: { total: 6 }

使用纯 RxJS 流执行此操作的简单方法是什么?

怎么样?

let source = Observable.range(1, 3).share();

let totalOb = source
    .reduce((total, value) => total + value, 0);

source
    .concat(totalOb)
    .subscribe( value => console.log(`Next: ${value}`) );

输出:

Next: 1
Next: 2
Next: 3
Next: 6

您可以使用 throwcatch 来分隔数据和摘要。

let source = Observable.range(1, 3).share();

let totalOb = source
    .reduce((total, value) => total + value, 0)
    .mergeMap(total => Observable.throw(total));

source
    .concat(totalOb)
    .subscribe(
        value => console.log(`Next: ${value}`),
        value => console.log(`Total: ${value}`)
    );

输出:

Next: 1
Next: 2
Next: 3
Total: 6

作为替代方案,您可以避免使用 share() 并制作两个 Observable 链而只制作一个链:

Observable.range(1, 3)
    .concat(Observable.of(null))
    .scan((obj, curr) => {
        if (curr) {
            obj.acc.push(curr);
        }
        obj.curr = curr;
        return obj;
    }, { acc: [], curr: 0 })
    .map(obj => obj.curr === null
        ? { total: (obj.acc.reduce((acc, curr) => acc + curr, 0)) }  // count total
        : obj.curr  // just return current item
    )
    .subscribe(console.log);

这会打印出您期望的结果:

1
2
3
{ total: 6 }

尽管使用 share() 看起来非常简单,但请注意,实际上您订阅了源 Observable 两次。实际上,这对您来说可能不是问题,具体取决于您将使用的 Observable 来源。

试试看,每个数字都打印两次:

let source = Observable.range(1, 3).do(console.log).share();

使用多播

Rx.Observable.range(1, 3)
 .multicast(new Rx.Subject(), (shared)=> {
    return Rx.Observable.merge(shared, shared.reduce((acc, x)=>acc+x,0))
 })
.subscribe(x=>console.log(x))