RxJS 5 refCount() 连接或取消订阅源时的执行逻辑
Execute logic when RxJS 5 refCount() connects to or unsubscribes from source
根据 Multicasting
上的 RxJS 5 手册部分
...we can use ConnectableObservable's refCount() method (reference counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution.
我想了解是否有可能挂接到这些事件中的每一个并执行一些逻辑,理想情况下是在源可观察对象的 connect()
或 unsubscribe()
发生之前,但即使在事实发生之后可以接受。
如果在使用 refCount()
运算符时无法做到这一点,如果您能提供一个示例如何使用自定义运算符实现这一点,我们将不胜感激。
我想也许我可以以某种方式使用 do(nextFn,errFn,completeFn)
中的 completeFn 来连接它,但似乎并没有像下面的代码片段所示那样工作。
var source = Rx.Observable.interval(500)
.do(
(x) => console.log('SOURCE emitted ' + x),
(err) => console.log('SOURCE erred ' + err),
() => console.log('SOURCE completed ')
);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
您可以在实际直播之前使用 .do(null,null, onComplete)
并在 completion/unsubscribe 之后使用 .finally()
的组合,以便在订阅之前和 completion/unsubscribe 之后举办活动:
const source = Rx.Observable.empty()
.do(null,null, () => console.log('subscribed'))
.concat(Rx.Observable.interval(500))
.finally(() => console.log('unsubscribed'))
.publish().refCount();
const sub1 = source
.take(5)
.subscribe(
val => console.log('sub1 ' + val),
null,
() => console.log('sub1 completed')
);
const sub2 = source
.take(3)
.subscribe(
val => console.log('sub2 ' + val),
null,
() => console.log('sub2 completed')
);
// simulate late subscription setting refCount() from 0 to 1 again
setTimeout(() => {
source
.take(1)
.subscribe(
val => console.log('late sub3 val: ' + val),
null,
() => console.log('sub3 completed')
);
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
根据 Multicasting
上的 RxJS 5 手册部分...we can use ConnectableObservable's refCount() method (reference counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution.
我想了解是否有可能挂接到这些事件中的每一个并执行一些逻辑,理想情况下是在源可观察对象的 connect()
或 unsubscribe()
发生之前,但即使在事实发生之后可以接受。
如果在使用 refCount()
运算符时无法做到这一点,如果您能提供一个示例如何使用自定义运算符实现这一点,我们将不胜感激。
我想也许我可以以某种方式使用 do(nextFn,errFn,completeFn)
中的 completeFn 来连接它,但似乎并没有像下面的代码片段所示那样工作。
var source = Rx.Observable.interval(500)
.do(
(x) => console.log('SOURCE emitted ' + x),
(err) => console.log('SOURCE erred ' + err),
() => console.log('SOURCE completed ')
);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
您可以在实际直播之前使用 .do(null,null, onComplete)
并在 completion/unsubscribe 之后使用 .finally()
的组合,以便在订阅之前和 completion/unsubscribe 之后举办活动:
const source = Rx.Observable.empty()
.do(null,null, () => console.log('subscribed'))
.concat(Rx.Observable.interval(500))
.finally(() => console.log('unsubscribed'))
.publish().refCount();
const sub1 = source
.take(5)
.subscribe(
val => console.log('sub1 ' + val),
null,
() => console.log('sub1 completed')
);
const sub2 = source
.take(3)
.subscribe(
val => console.log('sub2 ' + val),
null,
() => console.log('sub2 completed')
);
// simulate late subscription setting refCount() from 0 to 1 again
setTimeout(() => {
source
.take(1)
.subscribe(
val => console.log('late sub3 val: ' + val),
null,
() => console.log('sub3 completed')
);
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>