RxJS:如何仅在订阅外部 observable/subject 时订阅内部观察?

RxJS: How to subscribe to inner observable only when outer observable/subject is subscribed to?

背景

使用 Typescript 和 RxJS,我正在尝试扩展一个行为主体,以便它将一个可观察对象作为构造参数,然后订阅它,以便将可观察对象的每个值都设置为行为主体的值.

问题

当行为主体本身至少有一个订阅者时,如何才能使行为主体仅订阅内部可观察对象?

代码

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

export class SubjectWithInnerObservable<T> extends BehaviorSubject<T> {

    constructor(
        initialState: any,
        someObservable: Observable<T>
    ) {
        super(initialState);

        // I only want this subscription to exist when there are one or more subscribers to the behavior subject
        someObservable.subscribe(this);
    }
}

您可以在 websocket 上使用 .publishBehavior() 运算符,它内部有一个 BehaviorSubject。

'publish' 位意味着它在连接之前不会发射,就像软管上的水龙头一样。

添加 .refCount() 运算符,您可以在订阅时自动连接,即由订阅计数控制的点击。

文档:RefCount

演示

// mock websocket
const ws = new Rx.Subject()

const autoConnected = ws
  .do(x => console.log('ws emits', x)) // just to show ws stream
  .publishBehavior(null)  // make connectable, i.e only emit when subscribed
  .refCount()             // auto-connect on subscribe
  .filter(x => x)         // filter out that pesky initial value

ws.next(1) // before subscription - never emits

const subscription = autoConnected.subscribe(x => console.log('1st subscription', x))
ws.next(2)
ws.next(3)

subscription.unsubscribe()
ws.next(4)  // after unsubscribe - never emits

autoConnected.subscribe(x => console.log('2nd subscription', x))
ws.next(5) // after re-subscription - emits last plus next
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>