RxJS:链接的 Observables 不使用相同的根 Observable 值

RxJS: Chained Observables Not Using the Same Root Observable Values

我正在尝试学习 RxJS,并且我正在尝试构建我认为简单明了的示例。创建在某个时间间隔内生成的随机数的可观察值,然后使用基于该可观察值的其他两个可观察值来跟踪已看到的最低值和最高值。

rngStream 的行为似乎符合预期,minStream 和 maxStream 似乎也能正确跟踪。我遇到的问题是,执行此操作时,rngStream、minStream 和 maxStream 似乎在每个间隔上都有不同的随机数。我正在尝试了解这是否符合预期,或者我是否只是设置不正确。

我的目标是这样的输出:

[27, 27, 27]
[13, 13, 27]
[90, 13, 90]
[42, 13, 90]
...

let rngStream = Rx.Observable
  .interval(1000)
  .map(() => Math.ceil(Math.random()*100))
  .take(5);

// Track the lowest number we've seen.
let minStream = rngStream
  .startWith(100)
  .scan((x, y) => Math.min(x, y))

// Track the highest number we've seen.
let maxStream = rngStream
  .startWith(0)
  .scan((x, y) => Math.max(x, y))

Rx.Observable.zip(rngStream, minStream, maxStream)
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.min.js"></script>

https://www.youtube.com/watch?v=3LKMwkuK0ZE

这段视频其实让我明白了很多事情。特别是在 32:00 标记处,他提到了 .share ,这使得一个可观察的多播。默认情况下,每个订阅者都会得到自己的副本。所以修复看起来像这样:

let rngStream = Rx.Observable
  .interval(1000)
  .map(() => Math.ceil(Math.random()*100))
  .take(5)
  .share();

// Track the lowest number we've seen.
let minStream = rngStream
  .startWith(100)
  .scan((x, y) => Math.min(x, y))

// Track the highest number we've seen.
let maxStream = rngStream
  .startWith(0)
  .scan((x, y) => Math.max(x, y))

Rx.Observable.zip(rngStream, minStream, maxStream)
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.min.js"></script>