每 x 毫秒计算一个平均值
Computing a mean value every x ms
我有一个流发出一些数据。
const data$ = Rx
.Observable
.interval(500)
.map(() => Math.random() * 100)
我想计算一段时间内所有产生的值的平均值(例如每秒)
如果 data$
流发出 -2-2-2-4|
,我希望生成的流是 ---2---3|
你会如何使用 rxjs
(版本 5)
bufferTime
是一条路:
Rx.Observable
.interval(500)
.map(() => Math.random() * 100)
.do(console.log)
.bufferTime(2000)
.map(items => {
return items.reduce((acc, cur) => acc + cur, 0) / items.length;
})
.take(3)
.subscribe(val => {
console.log('mean: ' + val);
})
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>
好的,马克西姆的观点是有道理的。如果您需要计算大量值的平均值,请考虑使用 windowTime
:
var interval = Rx.Observable.interval(2000);
Rx.Observable
.interval(500)
.map(() => Math.random() * 100)
.do(console.log)
.window(interval)
.map(win => {
return win.reduce((acc, cur) => {
acc.count++;
acc.sum += cur;
return acc;
}, {
count: 0,
sum: 0,
});
})
.mergeAll()
.filter(val => val.count != 0)
.map(val => val.sum / val.count)
.take(3)
.subscribe(val => {
console.log('mean: ' + val);
})
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>
如果您确实需要每秒捕获一个值,Sergey Sokolov 的回答可能没问题。但是,例如,它宁愿每小时一次,bufferTime
可能必须在内存中保留该时间段内的值。在这里,如果我理解你的问题,你不需要保留这些值。
所以这样做可能是更好的主意:
const { Observable } = Rx;
const tick$ = Observable.interval(2000);
const myObs$ = Observable
.interval(500)
.map(() => Math.random() * 100)
.do(x => console.log(`Emit ${x}`));
const finalObs$ = tick$
.withLatestFrom(myObs$)
.map(([_, val]) => val)
.do(x => console.warn(`Taken ${x}`));
// only to test
finalObs$.take(3).subscribe();
输出:
这里的每个 console.log
都只是为了调试,但在您最终的观察结果中,您只会从上面的屏幕截图中获得黄色的值。
这是一个可用的 Plunkr https://plnkr.co/edit/sR4Dg1gD3Zh2yWUtl0RI?p=preview
我有一个流发出一些数据。
const data$ = Rx
.Observable
.interval(500)
.map(() => Math.random() * 100)
我想计算一段时间内所有产生的值的平均值(例如每秒)
如果 data$
流发出 -2-2-2-4|
,我希望生成的流是 ---2---3|
你会如何使用 rxjs
(版本 5)
bufferTime
是一条路:
Rx.Observable
.interval(500)
.map(() => Math.random() * 100)
.do(console.log)
.bufferTime(2000)
.map(items => {
return items.reduce((acc, cur) => acc + cur, 0) / items.length;
})
.take(3)
.subscribe(val => {
console.log('mean: ' + val);
})
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>
好的,马克西姆的观点是有道理的。如果您需要计算大量值的平均值,请考虑使用 windowTime
:
var interval = Rx.Observable.interval(2000);
Rx.Observable
.interval(500)
.map(() => Math.random() * 100)
.do(console.log)
.window(interval)
.map(win => {
return win.reduce((acc, cur) => {
acc.count++;
acc.sum += cur;
return acc;
}, {
count: 0,
sum: 0,
});
})
.mergeAll()
.filter(val => val.count != 0)
.map(val => val.sum / val.count)
.take(3)
.subscribe(val => {
console.log('mean: ' + val);
})
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>
如果您确实需要每秒捕获一个值,Sergey Sokolov 的回答可能没问题。但是,例如,它宁愿每小时一次,bufferTime
可能必须在内存中保留该时间段内的值。在这里,如果我理解你的问题,你不需要保留这些值。
所以这样做可能是更好的主意:
const { Observable } = Rx;
const tick$ = Observable.interval(2000);
const myObs$ = Observable
.interval(500)
.map(() => Math.random() * 100)
.do(x => console.log(`Emit ${x}`));
const finalObs$ = tick$
.withLatestFrom(myObs$)
.map(([_, val]) => val)
.do(x => console.warn(`Taken ${x}`));
// only to test
finalObs$.take(3).subscribe();
输出:
这里的每个 console.log
都只是为了调试,但在您最终的观察结果中,您只会从上面的屏幕截图中获得黄色的值。
这是一个可用的 Plunkr https://plnkr.co/edit/sR4Dg1gD3Zh2yWUtl0RI?p=preview