如何干净地重新连接到 ReplaySubject,同时避免过去的记忆副作用?

How to cleanly reconnect to a ReplaySubject while avoiding past memoization side-effects?

我将状态保存在一个 ReplaySubject 中,它会重播状态的最后一个副本。从该状态衍生出其他 ReplaySubjects 来保持......好吧,衍生状态。每个重播主题只需要保持其最后计算的 state/derived 状态。 (我们不使用 BehaviorSubjects 因为它们 总是 给出一个值,但我们只想要一个从我们的父 observables 派生的值。) 它总是必要的如果我们已经生成派生状态,则将值重播给新订阅者。

我有一个自定义的 observable 运算符,可以按照我想要的方式完成此操作,但感觉并不那么干净。我觉得应该有一种有效的方法可以让 RxJ 的操作员自己完成这个任务。

我已经尝试了两种最明显的方法,但每种方法都存在一些小问题。问题涉及取消订阅和重新订阅。 打开下面的 fiddle,打开您的控制台,然后单击 运行。我将描述每个输出的问题。

https://jsfiddle.net/gfe1nryp/1/

refCounted ReplaySubject

的问题
=== RefCounted Observable ===


Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Subscription 2: 3
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8

这很好用,中间函数在没有订阅的情况下不做任何工作。但是,一旦我们重新订阅。我们可以看到订阅2重播了取消订阅前的最后一个状态,然后在base$状态下根据当前值播放派生状态。这并不理想。

connected ReplaySubject

的问题
=== Hot Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Work
Work
Work
Resubscribe
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8

这个与 refCounted observable 没有相同的问题,没有不必要的重播取消订阅前的最后一个状态。然而,由于 observable 现在很热,权衡是每当新值进入 base$ 状态时我们总是工作,即使该值没有被任何订阅使用。

最后,我们有了自定义运算符:

=== Custom Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8

啊,两全其美。它不仅不会不必要地重播取消订阅前的最后一个值,而且在没有订阅时也不会做任何不必要的工作。 这是通过手动创建 RefCountReplaySubject 的组合来实现的。我们跟踪每个订阅者,当它达到 0 时,我们刷新重播值。它的代码在这里(当然在 fiddle 中):

Rx.Observable.prototype.selectiveReplay = function() {
  let subscribers = [];
  let innerSubscription;

  let storage = null;

  return Rx.Observable.create(observer => {
    if (subscribers.length > 0) {
      observer.next(storage);
    }

    subscribers.push(observer);

    if (!innerSubscription) {
      innerSubscription = this.subscribe(val => {
        storage = val;
        subscribers.forEach(subscriber => subscriber.next(val))
      });
    }

    return () => {
      subscribers = subscribers.filter(subscriber => subscriber !== observer);

      if (subscribers.length === 0) {
        storage = null;
        innerSubscription.unsubscribe();
        innerSubscription = null;
      }
    };
  });
};

那么,这个自定义的可观察对象已经可以工作了。但是,这可以只用 RxJS 运算符来完成吗?请记住,可能会有不止几个这样的主题链接在一起。在这个例子中,我只使用一个链接到 base$ 来说明我在最基本级别尝试过的两种普通方法的问题。 基本上,如果你只能使用 RxJS 运算符,并使输出与上面 === Custom Observable === 的输出相匹配。这就是我要找的。谢谢!

您应该能够将 multicast 与主题工厂而不是主题一起使用。比照。 https://jsfiddle.net/pto7ngov/1/

(function(){
  console.log('=== RefCounted Observable ===');
  var base$ = new Rx.ReplaySubject(1);

  var listen$ = base$.map(work).multicast(()=> new Rx.ReplaySubject(1)).refCount();

  var subscription1 = listen$.subscribe(x => console.log('Subscription 1: ' + x));

  base$.next(1);
  base$.next(2);
  base$.next(3);

    console.log('Unsubscribe');
  subscription1.unsubscribe();

  base$.next(4);
  base$.next(5);
  base$.next(6);

    console.log('Resubscribe');
  var subscription2 = listen$.subscribe(x => console.log('Subscription 2: ' + x));

  base$.next(7);
  base$.next(8);
})();

multicast operator 的这种重载完全符合您的用例。每次 multicast 运算符返回的可观察对象完成并重新连接时,它都会使用提供的工厂创建一个新主题。虽然它没有很好的记录,但它基本上从 Rxjs v4.

复制了一个现有的 API

如果我误解了或者不起作用请告诉我,