如何干净地重新连接到 ReplaySubject,同时避免过去的记忆副作用?
How to cleanly reconnect to a ReplaySubject while avoiding past memoization side-effects?
我将状态保存在一个 ReplaySubject
中,它会重播状态的最后一个副本。从该状态衍生出其他 ReplaySubjects
来保持......好吧,衍生状态。每个重播主题只需要保持其最后计算的 state/derived 状态。 (我们不使用 BehaviorSubjects
因为它们 总是 给出一个值,但我们只想要一个从我们的父 observables 派生的值。) 它总是必要的如果我们已经生成派生状态,则将值重播给新订阅者。
我有一个自定义的 observable 运算符,可以按照我想要的方式完成此操作,但感觉并不那么干净。我觉得应该有一种有效的方法可以让 RxJ 的操作员自己完成这个任务。
我已经尝试了两种最明显的方法,但每种方法都存在一些小问题。问题涉及取消订阅和重新订阅。
打开下面的 fiddle,打开您的控制台,然后单击 运行。我将描述每个输出的问题。
https://jsfiddle.net/gfe1nryp/1/
refCount
ed ReplaySubject
的问题
=== RefCounted Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Subscription 2: 3
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
这很好用,中间函数在没有订阅的情况下不做任何工作。但是,一旦我们重新订阅。我们可以看到订阅2重播了取消订阅前的最后一个状态,然后在base$
状态下根据当前值播放派生状态。这并不理想。
connect
ed ReplaySubject
的问题
=== Hot Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Work
Work
Work
Resubscribe
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
这个与 refCount
ed observable 没有相同的问题,没有不必要的重播取消订阅前的最后一个状态。然而,由于 observable 现在很热,权衡是每当新值进入 base$
状态时我们总是工作,即使该值没有被任何订阅使用。
最后,我们有了自定义运算符:
=== Custom Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
啊,两全其美。它不仅不会不必要地重播取消订阅前的最后一个值,而且在没有订阅时也不会做任何不必要的工作。
这是通过手动创建 RefCount
和 ReplaySubject
的组合来实现的。我们跟踪每个订阅者,当它达到 0 时,我们刷新重播值。它的代码在这里(当然在 fiddle 中):
Rx.Observable.prototype.selectiveReplay = function() {
let subscribers = [];
let innerSubscription;
let storage = null;
return Rx.Observable.create(observer => {
if (subscribers.length > 0) {
observer.next(storage);
}
subscribers.push(observer);
if (!innerSubscription) {
innerSubscription = this.subscribe(val => {
storage = val;
subscribers.forEach(subscriber => subscriber.next(val))
});
}
return () => {
subscribers = subscribers.filter(subscriber => subscriber !== observer);
if (subscribers.length === 0) {
storage = null;
innerSubscription.unsubscribe();
innerSubscription = null;
}
};
});
};
那么,这个自定义的可观察对象已经可以工作了。但是,这可以只用 RxJS 运算符来完成吗?请记住,可能会有不止几个这样的主题链接在一起。在这个例子中,我只使用一个链接到 base$
来说明我在最基本级别尝试过的两种普通方法的问题。
基本上,如果你只能使用 RxJS 运算符,并使输出与上面 === Custom Observable ===
的输出相匹配。这就是我要找的。谢谢!
您应该能够将 multicast
与主题工厂而不是主题一起使用。比照。 https://jsfiddle.net/pto7ngov/1/
(function(){
console.log('=== RefCounted Observable ===');
var base$ = new Rx.ReplaySubject(1);
var listen$ = base$.map(work).multicast(()=> new Rx.ReplaySubject(1)).refCount();
var subscription1 = listen$.subscribe(x => console.log('Subscription 1: ' + x));
base$.next(1);
base$.next(2);
base$.next(3);
console.log('Unsubscribe');
subscription1.unsubscribe();
base$.next(4);
base$.next(5);
base$.next(6);
console.log('Resubscribe');
var subscription2 = listen$.subscribe(x => console.log('Subscription 2: ' + x));
base$.next(7);
base$.next(8);
})();
multicast
operator 的这种重载完全符合您的用例。每次 multicast
运算符返回的可观察对象完成并重新连接时,它都会使用提供的工厂创建一个新主题。虽然它没有很好的记录,但它基本上从 Rxjs v4.
复制了一个现有的 API
如果我误解了或者不起作用请告诉我,
我将状态保存在一个 ReplaySubject
中,它会重播状态的最后一个副本。从该状态衍生出其他 ReplaySubjects
来保持......好吧,衍生状态。每个重播主题只需要保持其最后计算的 state/derived 状态。 (我们不使用 BehaviorSubjects
因为它们 总是 给出一个值,但我们只想要一个从我们的父 observables 派生的值。) 它总是必要的如果我们已经生成派生状态,则将值重播给新订阅者。
我有一个自定义的 observable 运算符,可以按照我想要的方式完成此操作,但感觉并不那么干净。我觉得应该有一种有效的方法可以让 RxJ 的操作员自己完成这个任务。
我已经尝试了两种最明显的方法,但每种方法都存在一些小问题。问题涉及取消订阅和重新订阅。 打开下面的 fiddle,打开您的控制台,然后单击 运行。我将描述每个输出的问题。
https://jsfiddle.net/gfe1nryp/1/
refCount
ed ReplaySubject
=== RefCounted Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Subscription 2: 3
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
这很好用,中间函数在没有订阅的情况下不做任何工作。但是,一旦我们重新订阅。我们可以看到订阅2重播了取消订阅前的最后一个状态,然后在base$
状态下根据当前值播放派生状态。这并不理想。
connect
ed ReplaySubject
=== Hot Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Work
Work
Work
Resubscribe
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
这个与 refCount
ed observable 没有相同的问题,没有不必要的重播取消订阅前的最后一个状态。然而,由于 observable 现在很热,权衡是每当新值进入 base$
状态时我们总是工作,即使该值没有被任何订阅使用。
最后,我们有了自定义运算符:
=== Custom Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8
啊,两全其美。它不仅不会不必要地重播取消订阅前的最后一个值,而且在没有订阅时也不会做任何不必要的工作。
这是通过手动创建 RefCount
和 ReplaySubject
的组合来实现的。我们跟踪每个订阅者,当它达到 0 时,我们刷新重播值。它的代码在这里(当然在 fiddle 中):
Rx.Observable.prototype.selectiveReplay = function() {
let subscribers = [];
let innerSubscription;
let storage = null;
return Rx.Observable.create(observer => {
if (subscribers.length > 0) {
observer.next(storage);
}
subscribers.push(observer);
if (!innerSubscription) {
innerSubscription = this.subscribe(val => {
storage = val;
subscribers.forEach(subscriber => subscriber.next(val))
});
}
return () => {
subscribers = subscribers.filter(subscriber => subscriber !== observer);
if (subscribers.length === 0) {
storage = null;
innerSubscription.unsubscribe();
innerSubscription = null;
}
};
});
};
那么,这个自定义的可观察对象已经可以工作了。但是,这可以只用 RxJS 运算符来完成吗?请记住,可能会有不止几个这样的主题链接在一起。在这个例子中,我只使用一个链接到 base$
来说明我在最基本级别尝试过的两种普通方法的问题。
基本上,如果你只能使用 RxJS 运算符,并使输出与上面 === Custom Observable ===
的输出相匹配。这就是我要找的。谢谢!
您应该能够将 multicast
与主题工厂而不是主题一起使用。比照。 https://jsfiddle.net/pto7ngov/1/
(function(){
console.log('=== RefCounted Observable ===');
var base$ = new Rx.ReplaySubject(1);
var listen$ = base$.map(work).multicast(()=> new Rx.ReplaySubject(1)).refCount();
var subscription1 = listen$.subscribe(x => console.log('Subscription 1: ' + x));
base$.next(1);
base$.next(2);
base$.next(3);
console.log('Unsubscribe');
subscription1.unsubscribe();
base$.next(4);
base$.next(5);
base$.next(6);
console.log('Resubscribe');
var subscription2 = listen$.subscribe(x => console.log('Subscription 2: ' + x));
base$.next(7);
base$.next(8);
})();
multicast
operator 的这种重载完全符合您的用例。每次 multicast
运算符返回的可观察对象完成并重新连接时,它都会使用提供的工厂创建一个新主题。虽然它没有很好的记录,但它基本上从 Rxjs v4.
如果我误解了或者不起作用请告诉我,