RxJS:如何仅在订阅外部 observable/subject 时订阅内部观察?
RxJS: How to subscribe to inner observable only when outer observable/subject is subscribed to?
背景
使用 Typescript 和 RxJS,我正在尝试扩展一个行为主体,以便它将一个可观察对象作为构造参数,然后订阅它,以便将可观察对象的每个值都设置为行为主体的值.
问题
当行为主体本身至少有一个订阅者时,如何才能使行为主体仅订阅内部可观察对象?
代码
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
export class SubjectWithInnerObservable<T> extends BehaviorSubject<T> {
constructor(
initialState: any,
someObservable: Observable<T>
) {
super(initialState);
// I only want this subscription to exist when there are one or more subscribers to the behavior subject
someObservable.subscribe(this);
}
}
您可以在 websocket 上使用 .publishBehavior()
运算符,它内部有一个 BehaviorSubject。
'publish' 位意味着它在连接之前不会发射,就像软管上的水龙头一样。
添加 .refCount()
运算符,您可以在订阅时自动连接,即由订阅计数控制的点击。
文档:RefCount
演示
// mock websocket
const ws = new Rx.Subject()
const autoConnected = ws
.do(x => console.log('ws emits', x)) // just to show ws stream
.publishBehavior(null) // make connectable, i.e only emit when subscribed
.refCount() // auto-connect on subscribe
.filter(x => x) // filter out that pesky initial value
ws.next(1) // before subscription - never emits
const subscription = autoConnected.subscribe(x => console.log('1st subscription', x))
ws.next(2)
ws.next(3)
subscription.unsubscribe()
ws.next(4) // after unsubscribe - never emits
autoConnected.subscribe(x => console.log('2nd subscription', x))
ws.next(5) // after re-subscription - emits last plus next
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
背景
使用 Typescript 和 RxJS,我正在尝试扩展一个行为主体,以便它将一个可观察对象作为构造参数,然后订阅它,以便将可观察对象的每个值都设置为行为主体的值.
问题
当行为主体本身至少有一个订阅者时,如何才能使行为主体仅订阅内部可观察对象?
代码
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
export class SubjectWithInnerObservable<T> extends BehaviorSubject<T> {
constructor(
initialState: any,
someObservable: Observable<T>
) {
super(initialState);
// I only want this subscription to exist when there are one or more subscribers to the behavior subject
someObservable.subscribe(this);
}
}
您可以在 websocket 上使用 .publishBehavior()
运算符,它内部有一个 BehaviorSubject。
'publish' 位意味着它在连接之前不会发射,就像软管上的水龙头一样。
添加 .refCount()
运算符,您可以在订阅时自动连接,即由订阅计数控制的点击。
文档:RefCount
演示
// mock websocket
const ws = new Rx.Subject()
const autoConnected = ws
.do(x => console.log('ws emits', x)) // just to show ws stream
.publishBehavior(null) // make connectable, i.e only emit when subscribed
.refCount() // auto-connect on subscribe
.filter(x => x) // filter out that pesky initial value
ws.next(1) // before subscription - never emits
const subscription = autoConnected.subscribe(x => console.log('1st subscription', x))
ws.next(2)
ws.next(3)
subscription.unsubscribe()
ws.next(4) // after unsubscribe - never emits
autoConnected.subscribe(x => console.log('2nd subscription', x))
ws.next(5) // after re-subscription - emits last plus next
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>