RxJS 5 refCount() 连接或取消订阅源时的执行逻辑

Execute logic when RxJS 5 refCount() connects to or unsubscribes from source

根据 Multicasting

上的 RxJS 5 手册部分

...we can use ConnectableObservable's refCount() method (reference counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution.

我想了解是否有可能挂接到这些事件中的每一个并执行一些逻辑,理想情况下是在源可观察对象的 connect()unsubscribe() 发生之前,但即使在事实发生之后可以接受。

如果在使用 refCount() 运算符时无法做到这一点,如果您能提供一个示例如何使用自定义运算符实现这一点,我们将不胜感激。

我想也许我可以以某种方式使用 do(nextFn,errFn,completeFn) 中的 completeFn 来连接它,但似乎并没有像下面的代码片段所示那样工作。

var source = Rx.Observable.interval(500)
  .do(
    (x) => console.log('SOURCE emitted ' + x),
    (err) => console.log('SOURCE erred ' + err),
    () => console.log('SOURCE completed ')
  );
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

您可以在实际直播之前使用 .do(null,null, onComplete) 并在 completion/unsubscribe 之后使用 .finally() 的组合,以便在订阅之前和 completion/unsubscribe 之后举办活动:

const source = Rx.Observable.empty()
  .do(null,null, () => console.log('subscribed'))
  .concat(Rx.Observable.interval(500))
  .finally(() => console.log('unsubscribed'))
  .publish().refCount();

const sub1 = source
  .take(5)
   .subscribe(
     val => console.log('sub1 ' + val),
     null, 
     () => console.log('sub1 completed')
   );
const sub2 = source
  .take(3)
  .subscribe(
    val => console.log('sub2 ' + val), 
    null, 
    () => console.log('sub2 completed')
  );

// simulate late subscription setting refCount() from 0 to 1 again                      
setTimeout(() => {
  source
    .take(1)
    .subscribe(
      val => console.log('late sub3 val: ' + val),
      null, 
      () => console.log('sub3 completed')
    );
 
}, 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>