"current value" 独立于订阅者的概念

Notion of "current value" independent of subscribers

我如何构造一个仅 "keeps going" 的可观察对象 独立于任何订阅者(即 refCount 等不在 问题)并为迟到的订阅者提供最新值?

这是我尝试过的:

// Approach 1
const myObservable$ = Rx.Observable.timer(0, 1000).publish();
myObservable.connect();
myObservable$.subscribe(x => console.log(x));
setTimeOut(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

// 0
// 1
// 2
// 3
// 4
// late 4
// 4
// late 5
// ...

方法 1 的问题是迟到的订阅者在 t=3.5s 没有得到 "current value" 3. 我想要的是

的输出
// 0
// 1
// 2
// 3
// late 3
// 4
// late 4
// ...

另一种方法使用 publishValue:

// Approach 2
const myObservable$ = Rx.Observable.timer(0, 1000).publishValue();
myObservable.connect();
myObservable$.subscribe(x => console.log(x));
setTimeOut(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

// undefined
// 0
// 1
// 2
// 3
// late 3
// 4
// late 4
// ...

在方法 2 中,迟到的订阅者在 t = 3.5 秒时获得 "correct" 值。 这种方法的问题是我们需要提供一个初始的 我们可能并不总是拥有的价值。

// Approach 3
const myObservable$ = Rx.Observable.timer(0, 1000).replay(1);
myObservable.connect();
myObservable$.subscribe(x => console.log(x));
setTimeOut(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

// 0
// 1
// 2
// 3
// late 0
// late 1
// late 2
// late 3
// 4
// late 4
// ...

此时我迷路了。我的印象是 .replay(1) 可能会解决我的问题,但不知何故它会重播不止一个事件。

有什么想法吗?

方法 3 是您问题的正确答案。但是,您使用的是 interface incorrectly.

  /**
   *
   * @example
   * var res = source.replay(null, 3);
   * var res = source.replay(null, 3, 500);
   * var res = source.replay(null, 3, 500, scheduler);
   * var res = source.replay(function (x) { return x.take(6).repeat(); }, 3, 500, scheduler);
   *
   * @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
   * @param bufferSize [Optional] Maximum element count of the replay buffer.
   * @param windowSize [Optional] Maximum time length of the replay buffer.
   * @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
   * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
   */
Rx.Observable.prototype.replay([selector], [bufferSize], [window], [scheduler]) 

您需要使用第一个重载 source.replay(null, 3),因此您的代码应该是:

const myObservable$ = Rx.Observable.timer(0, 1000).replay(null, 1);
myObservable$.connect();
myObservable$.subscribe(x => console.log(x));
setTimeout(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

就是shareReplay啦!

const myObservable$ = Rx.Observable.timer(0, 1000).shareReplay(1);
myObservable$.subscribe(x => console.log(x));
setTimeout(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);