RxJS:如何发出原始值,然后在完成时减少?
RxJS: How to emit original values, then reduce upon completion?
我想从 RxJS 流发出所有原始值,然后在完成后发出摘要。
Reduce 停止发射原始值。扫描发出每个总数而不是原始值。
这是我的 hacky 解决方案:
let total = {
total: 0
};
Rx.Observable.range(1, 3)
.do(val => {
total.total += val;
})
.concat(Rx.Observable.of(total))
.subscribe(
value => {
console.log('Next:', value)
}
);
// Next: 1
// Next: 2
// Next: 3
// Next: { total: 6 }
使用纯 RxJS 流执行此操作的简单方法是什么?
怎么样?
let source = Observable.range(1, 3).share();
let totalOb = source
.reduce((total, value) => total + value, 0);
source
.concat(totalOb)
.subscribe( value => console.log(`Next: ${value}`) );
输出:
Next: 1
Next: 2
Next: 3
Next: 6
您可以使用 throw
和 catch
来分隔数据和摘要。
let source = Observable.range(1, 3).share();
let totalOb = source
.reduce((total, value) => total + value, 0)
.mergeMap(total => Observable.throw(total));
source
.concat(totalOb)
.subscribe(
value => console.log(`Next: ${value}`),
value => console.log(`Total: ${value}`)
);
输出:
Next: 1
Next: 2
Next: 3
Total: 6
作为替代方案,您可以避免使用 share()
并制作两个 Observable 链而只制作一个链:
Observable.range(1, 3)
.concat(Observable.of(null))
.scan((obj, curr) => {
if (curr) {
obj.acc.push(curr);
}
obj.curr = curr;
return obj;
}, { acc: [], curr: 0 })
.map(obj => obj.curr === null
? { total: (obj.acc.reduce((acc, curr) => acc + curr, 0)) } // count total
: obj.curr // just return current item
)
.subscribe(console.log);
这会打印出您期望的结果:
1
2
3
{ total: 6 }
尽管使用 share()
看起来非常简单,但请注意,实际上您订阅了源 Observable 两次。实际上,这对您来说可能不是问题,具体取决于您将使用的 Observable 来源。
试试看,每个数字都打印两次:
let source = Observable.range(1, 3).do(console.log).share();
使用多播
Rx.Observable.range(1, 3)
.multicast(new Rx.Subject(), (shared)=> {
return Rx.Observable.merge(shared, shared.reduce((acc, x)=>acc+x,0))
})
.subscribe(x=>console.log(x))
我想从 RxJS 流发出所有原始值,然后在完成后发出摘要。
Reduce 停止发射原始值。扫描发出每个总数而不是原始值。
这是我的 hacky 解决方案:
let total = {
total: 0
};
Rx.Observable.range(1, 3)
.do(val => {
total.total += val;
})
.concat(Rx.Observable.of(total))
.subscribe(
value => {
console.log('Next:', value)
}
);
// Next: 1
// Next: 2
// Next: 3
// Next: { total: 6 }
使用纯 RxJS 流执行此操作的简单方法是什么?
怎么样?
let source = Observable.range(1, 3).share();
let totalOb = source
.reduce((total, value) => total + value, 0);
source
.concat(totalOb)
.subscribe( value => console.log(`Next: ${value}`) );
输出:
Next: 1
Next: 2
Next: 3
Next: 6
您可以使用 throw
和 catch
来分隔数据和摘要。
let source = Observable.range(1, 3).share();
let totalOb = source
.reduce((total, value) => total + value, 0)
.mergeMap(total => Observable.throw(total));
source
.concat(totalOb)
.subscribe(
value => console.log(`Next: ${value}`),
value => console.log(`Total: ${value}`)
);
输出:
Next: 1
Next: 2
Next: 3
Total: 6
作为替代方案,您可以避免使用 share()
并制作两个 Observable 链而只制作一个链:
Observable.range(1, 3)
.concat(Observable.of(null))
.scan((obj, curr) => {
if (curr) {
obj.acc.push(curr);
}
obj.curr = curr;
return obj;
}, { acc: [], curr: 0 })
.map(obj => obj.curr === null
? { total: (obj.acc.reduce((acc, curr) => acc + curr, 0)) } // count total
: obj.curr // just return current item
)
.subscribe(console.log);
这会打印出您期望的结果:
1
2
3
{ total: 6 }
尽管使用 share()
看起来非常简单,但请注意,实际上您订阅了源 Observable 两次。实际上,这对您来说可能不是问题,具体取决于您将使用的 Observable 来源。
试试看,每个数字都打印两次:
let source = Observable.range(1, 3).do(console.log).share();
使用多播
Rx.Observable.range(1, 3)
.multicast(new Rx.Subject(), (shared)=> {
return Rx.Observable.merge(shared, shared.reduce((acc, x)=>acc+x,0))
})
.subscribe(x=>console.log(x))