RxJS - 我如何 map/switch 一个 Subject Observable 并为每个发射值添加一个平均计算?

RxJS - How can I map/switch a Subject Observable and add a mean calculation to every emitted value?

我是 RxJS 的新手,我想做以下事情:

假设我正在编写一种方法来跟踪一段时间内的温度。此方法将观察一个 Subject,它将发出如下值:[12, 49, -2, 26, 5, ...]

我怎样才能把它变成另一个 Observable,随着时间的推移为每个值增加平均值?

[
  {
    temperature: 12,
    mean: 12
  },
  {
    temperature: 49,
    mean: 30.5
  },
  {
    temperature: -2,
    mean: 19.67
  },
  {
    temperature: 26,
    mean: 21.25
  },
  {
    temperature: 5,
    mean: 18
  },
  ...
]

我遇到的困难是均值计算应该使用所有以前的值来完成。

有办法吗?我实际上需要添加更多数据并计算其他值,但这是我需要做的事情的要点。

像对数组进行 reduce 一样使用扫描。通过 { num: 0, total: 0, mean: 0 } 传入一个起始累加器,每次迭代都会增加 num,将当前温度添加到总温度并计算平均值。将 observables 视为随时间发生的数组有时有助于将它们可视化。

const { from, timer, zip } = rxjs;
const { scan } = rxjs.operators;

const temps = [12, 49, -2, 26, 5];

// Let's do it with an array
console.log(
  temps.reduce(
    (accumulator, temp) => ({
      num: accumulator.num + 1,
      temp: temp,
      total: accumulator.total + temp,
      mean: (accumulator.total + temp) / (accumulator.num + 1)
    }),
    { num: 0, temp: 0, total: 0, mean: 0 }
  )
);

const temps$ = from(temps);

const timer$ = timer(0, 1500);

var tempsOverTime$ = zip(temps$, timer$, (temp, _) => temp);

// Now let's do the same thing with an observable over time.
tempsOverTime$
  .pipe(
    scan(
      (accumulator, temp) => ({
        num: accumulator.num + 1,
        temp: temp,
        total: accumulator.total + temp,
        mean: (accumulator.total + temp) / (accumulator.num + 1)
      }),
      { num: 0, temp: 0, total: 0, mean: 0 }
    )
  )
  .subscribe(a => {
    console.log({ temp: a.temp, mean: a.mean });
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.min.js"></script>