每 x 毫秒计算一个平均值

Computing a mean value every x ms

我有一个流发出一些数据。

const data$ = Rx
  .Observable
  .interval(500)
  .map(() => Math.random() * 100)

我想计算一段时间内所有产生的值的平均值(例如每秒)

如果 data$ 流发出 -2-2-2-4|,我希望生成的流是 ---2---3|

你会如何使用 rxjs(版本 5)

bufferTime是一条路:

Rx.Observable
  .interval(500)
  .map(() => Math.random() * 100)
  .do(console.log)
  .bufferTime(2000)
  .map(items => {
    return items.reduce((acc, cur) => acc + cur, 0) / items.length;
  })
  .take(3)
  .subscribe(val => {
    console.log('mean: ' + val);
  })
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>

好的,马克西姆的观点是有道理的。如果您需要计算大量值的平均值,请考虑使用 windowTime:

var interval = Rx.Observable.interval(2000);

Rx.Observable
  .interval(500)
  .map(() => Math.random() * 100)
  .do(console.log)
  .window(interval)
  .map(win => {
    return win.reduce((acc, cur) => {
      acc.count++;
      acc.sum += cur;
      return acc;
    }, {
      count: 0,
      sum: 0,
    });
  })
  .mergeAll()
  .filter(val => val.count != 0)
  .map(val => val.sum / val.count)
  .take(3)
  .subscribe(val => {
    console.log('mean: ' + val);
  })
<script src="https://npmcdn.com/@reactivex/rxjs@5.3.1/dist/global/Rx.js"></script>

如果您确实需要每秒捕获一个值,Sergey Sokolov 的回答可能没问题。但是,例如,它宁愿每小时一次,bufferTime 可能必须在内存中保留该时间段内的值。在这里,如果我理解你的问题,你不需要保留这些值。

所以这样做可能是更好的主意:

const { Observable } = Rx;

const tick$ = Observable.interval(2000);

const myObs$ = Observable
  .interval(500)
  .map(() => Math.random() * 100)
  .do(x => console.log(`Emit ${x}`));

const finalObs$ = tick$
  .withLatestFrom(myObs$)
  .map(([_, val]) => val)
  .do(x => console.warn(`Taken ${x}`));

// only to test
finalObs$.take(3).subscribe();

输出:

这里的每个 console.log 都只是为了调试,但在您最终的观察结果中,您只会从上面的屏幕截图中获得黄色的值。

这是一个可用的 Plunkr https://plnkr.co/edit/sR4Dg1gD3Zh2yWUtl0RI?p=preview